Source code for pymc.gp.gp

#   Copyright 2024 The PyMC Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import warnings

from functools import partial

import numpy as np
import pytensor.tensor as pt

from pytensor.tensor.linalg import cholesky, eigh, solve_triangular

import pymc as pm

from pymc.gp.cov import BaseCovariance, Constant
from pymc.gp.mean import Zero
from pymc.gp.util import (
    JITTER_DEFAULT,
    conditioned_vars,
    replace_with_values,
    stabilize,
)
from pymc.math import cartesian, kron_diag, kron_dot, kron_solve_lower, kron_solve_upper

solve_lower = partial(solve_triangular, lower=True)
solve_upper = partial(solve_triangular, lower=False)

__all__ = ["Latent", "Marginal", "TP", "MarginalApprox", "LatentKron", "MarginalKron"]


_noise_deprecation_warning = (
    "The 'noise' parameter has been been changed to 'sigma' "
    "in order to standardize the GP API and will be "
    "deprecated in future releases."
)


def _handle_sigma_noise_parameters(sigma, noise):
    """Help transition of 'noise' parameter to be named 'sigma'."""
    if (sigma is None and noise is None) or (sigma is not None and noise is not None):
        raise ValueError("'sigma' argument must be specified.")

    if sigma is None:
        warnings.warn(_noise_deprecation_warning, FutureWarning)
        return noise

    return sigma


class Base:
    """Base class."""

    def __init__(self, *, mean_func=Zero(), cov_func=Constant(0.0)):
        self.mean_func = mean_func
        self.cov_func = cov_func

    def __add__(self, other):
        same_attrs = set(self.__dict__.keys()) == set(other.__dict__.keys())
        if not isinstance(self, type(other)) or not same_attrs:
            raise TypeError("Cannot add different GP types")
        mean_total = self.mean_func + other.mean_func
        cov_total = self.cov_func + other.cov_func
        return self.__class__(mean_func=mean_total, cov_func=cov_total)

    def prior(self, name, X, *args, **kwargs):
        raise NotImplementedError

    def marginal_likelihood(self, name, X, *args, **kwargs):
        raise NotImplementedError

    def conditional(self, name, Xnew, *args, **kwargs):
        raise NotImplementedError

    def predict(self, Xnew, point=None, given=None, diag=False, model=None):
        raise NotImplementedError


[docs] @conditioned_vars(["X", "f"]) class Latent(Base): R""" Latent Gaussian process. The `gp.Latent` class is a direct implementation of a GP. No additive noise is assumed. It is called "Latent" because the underlying function values are treated as latent variables. It has a `prior` method and a `conditional` method. Given a mean and covariance function the function :math:`f(x)` is modeled as, .. math:: f(x) \sim \mathcal{GP}\left(\mu(x), k(x, x')\right) Use the `prior` and `conditional` methods to actually construct random variables representing the unknown, or latent, function whose distribution is the GP prior or GP conditional. This GP implementation can be used to implement regression on data that is not normally distributed. For more information on the `prior` and `conditional` methods, see their docstrings. Parameters ---------- mean_func : Mean, default ~pymc.gp.mean.Zero The mean function. cov_func : 2D array-like, or Covariance, default ~pymc.gp.cov.Constant The covariance function. Examples -------- .. code:: python # A one dimensional column vector of inputs. X = np.linspace(0, 1, 10)[:, None] with pm.Model() as model: # Specify the covariance function. cov_func = pm.gp.cov.ExpQuad(1, ls=0.1) # Specify the GP. The default mean function is `Zero`. gp = pm.gp.Latent(cov_func=cov_func) # Place a GP prior over the function f. f = gp.prior("f", X=X) ... # After fitting or sampling, specify the distribution # at new points with .conditional Xnew = np.linspace(-1, 2, 50)[:, None] with model: fcond = gp.conditional("fcond", Xnew=Xnew) """
[docs] def __init__(self, *, mean_func=Zero(), cov_func=Constant(0.0)): super().__init__(mean_func=mean_func, cov_func=cov_func)
def _build_prior( self, name, X, n_outputs=1, reparameterize=True, jitter=JITTER_DEFAULT, **kwargs ): mu = self.mean_func(X) cov = stabilize(self.cov_func(X), jitter) if reparameterize: if "dims" in kwargs: v = pm.Normal( name + "_rotated_", mu=0.0, sigma=1.0, **kwargs, ) else: size = (n_outputs, X.shape[0]) if n_outputs > 1 else X.shape[0] v = pm.Normal(name + "_rotated_", mu=0.0, sigma=1.0, size=size, **kwargs) f = pm.Deterministic( name, mu + cholesky(cov).dot(v.T).transpose(), dims=kwargs.get("dims", None), ) else: mu_stack = pt.stack([mu] * n_outputs, axis=0) if n_outputs > 1 else mu f = pm.MvNormal(name, mu=mu_stack, cov=cov, **kwargs) return f
[docs] def prior(self, name, X, n_outputs=1, reparameterize=True, jitter=JITTER_DEFAULT, **kwargs): R""" Return the GP prior distribution evaluated over the input locations `X`. This is the prior probability over the space of functions described by its mean and covariance function. .. math:: f \mid X \sim \text{MvNormal}\left( \mu(X), k(X, X') \right) Parameters ---------- name : str Name of the random variable X : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. n_outputs : int, default 1 Number of output GPs. If you're using `dims`, make sure their size is equal to `(n_outputs, X.shape[0])`, i.e the number of output GPs by the number of input points. Example: `gp.prior("f", X=X, n_outputs=3, dims=("n_gps", "x_dim"))`, where `len(n_gps) = 3` and `len(x_dim = X.shape[0]`. reparameterize : bool, default True Reparameterize the distribution by rotating the random variable by the Cholesky factor of the covariance matrix. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvNormal` distribution constructor. """ f = self._build_prior(name, X, n_outputs, reparameterize, jitter, **kwargs) self.X = X self.f = f self.n_outputs = n_outputs return f
def _get_given_vals(self, given): if given is None: given = {} if "gp" in given: cov_total = given["gp"].cov_func mean_total = given["gp"].mean_func else: cov_total = self.cov_func mean_total = self.mean_func if all(val in given for val in ["X", "f"]): X, f = given["X"], given["f"] else: X, f = self.X, self.f return X, f, cov_total, mean_total def _build_conditional(self, Xnew, X, f, cov_total, mean_total, jitter): Kxx = cov_total(X) Kxs = self.cov_func(X, Xnew) L = cholesky(stabilize(Kxx, jitter)) A = solve_lower(L, Kxs) v = solve_lower(L, (f - mean_total(X)).T) mu = self.mean_func(Xnew) + pt.dot(pt.transpose(A), v).T Kss = self.cov_func(Xnew) cov = Kss - pt.dot(pt.transpose(A), A) return mu, cov
[docs] def conditional(self, name, Xnew, given=None, jitter=JITTER_DEFAULT, **kwargs): R""" Return the conditional distribution evaluated over new input locations `Xnew`. Given a set of function values `f` that the GP prior was over, the conditional distribution over a set of new points, `f_*` is .. math:: f_* \mid f, X, X_* \sim \mathcal{GP}\left( K(X_*, X) K(X, X)^{-1} f \,, K(X_*, X_*) - K(X_*, X) K(X, X)^{-1} K(X, X_*) \right) Parameters ---------- name : str Name of the random variable Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. given : dict, optional Can take as key value pairs: `X`, `y`, and `gp`. See the :ref:`section <additive_gp>` in the documentation on additive GP models in pymc for more information. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvNormal` distribution constructor. """ givens = self._get_given_vals(given) mu, cov = self._build_conditional(Xnew, *givens, jitter) f = pm.MvNormal(name, mu=mu, cov=cov, **kwargs) return f
[docs] @conditioned_vars(["X", "f", "nu"]) class TP(Latent): r""" Student's T process prior. The usage is nearly identical to that of `gp.Latent`. The differences are that it must be initialized with a degrees of freedom parameter, and TP is not additive. Given a mean and covariance function, and a degrees of freedom parameter, the function :math:`f(x)` is modeled as, .. math:: f(X) \sim \mathcal{TP}\left( \mu(X), k(X, X'), \nu \right) Parameters ---------- mean_func : Mean, default ~pymc.gp.mean.Zero The mean function. scale_func : 2D array-like, or Covariance, default ~pymc.gp.cov.Constant The covariance function. cov_func : 2D array-like, or Covariance, default None Deprecated, previous version of "scale_func" nu : float The degrees of freedom References ---------- - Shah, A., Wilson, A. G., and Ghahramani, Z. (2014). Student-t Processes as Alternatives to Gaussian Processes. arXiv preprint arXiv:1402.4306. """
[docs] def __init__(self, *, mean_func=Zero(), scale_func=Constant(0.0), cov_func=None, nu=None): if nu is None: raise ValueError("Student's T process requires a degrees of freedom parameter, 'nu'") if cov_func is not None: warnings.warn( "Use the scale_func argument to specify the scale function." "cov_func will be removed in future versions.", FutureWarning, ) scale_func = cov_func self.nu = nu super().__init__(mean_func=mean_func, cov_func=scale_func)
def __add__(self, other): """Add two Student's T processes.""" raise TypeError("Student's T processes aren't additive") def _build_prior(self, name, X, reparameterize=True, jitter=JITTER_DEFAULT, **kwargs): mu = self.mean_func(X) cov = stabilize(self.cov_func(X), jitter) if reparameterize: size = np.shape(X)[0] v = pm.StudentT(name + "_rotated_", mu=0.0, sigma=1.0, nu=self.nu, size=size, **kwargs) f = pm.Deterministic(name, mu + cholesky(cov).dot(v), dims=kwargs.get("dims", None)) else: f = pm.MvStudentT(name, nu=self.nu, mu=mu, scale=cov, **kwargs) return f
[docs] def prior(self, name, X, reparameterize=True, jitter=JITTER_DEFAULT, **kwargs): R""" Return the TP prior distribution evaluated over the input locations `X`. This is the prior probability over the space of functions described by its mean and covariance function. Parameters ---------- name : str Name of the random variable X : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. reparameterize : bool, default True Reparameterize the distribution by rotating the random variable by the Cholesky factor of the covariance matrix. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvStudentT` distribution constructor. """ f = self._build_prior(name, X, reparameterize, jitter, **kwargs) self.X = X self.f = f return f
def _build_conditional(self, Xnew, X, f, jitter): Kxx = self.cov_func(X) Kxs = self.cov_func(X, Xnew) Kss = self.cov_func(Xnew) L = cholesky(stabilize(Kxx, jitter)) A = solve_lower(L, Kxs) cov = Kss - pt.dot(pt.transpose(A), A) v = solve_lower(L, f - self.mean_func(X)) mu = self.mean_func(Xnew) + pt.dot(pt.transpose(A), v) beta = pt.dot(v, v) nu2 = self.nu + X.shape[0] covT = (self.nu + beta - 2) / (nu2 - 2) * cov return nu2, mu, covT
[docs] def conditional(self, name, Xnew, jitter=JITTER_DEFAULT, **kwargs): R""" Return the conditional distribution evaluated over new input locations `Xnew`. Given a set of function values `f` that the TP prior was over, the conditional distribution over a set of new points, `f_*` is Parameters ---------- name : str Name of the random variable Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvStudentT` distribution constructor. """ X = self.X f = self.f nu2, mu, cov = self._build_conditional(Xnew, X, f, jitter) return pm.MvStudentT(name, nu=nu2, mu=mu, scale=cov, **kwargs)
[docs] @conditioned_vars(["X", "y", "sigma"]) class Marginal(Base): R""" Marginal Gaussian process. The `gp.Marginal` class is an implementation of the sum of a GP prior and additive noise. It has `marginal_likelihood`, `conditional` and `predict` methods. This GP implementation can be used to implement regression on data that is normally distributed. For more information on the `marginal_likelihood`, `conditional` and `predict` methods, see their docstrings. Parameters ---------- mean_func : Mean, default ~pymc.gp.mean.Zero The mean function. cov_func : 2D array-like, or Covariance, default ~pymc.gp.cov.Constant The covariance function. Examples -------- .. code:: python # A one dimensional column vector of inputs. X = np.linspace(0, 1, 10)[:, None] with pm.Model() as model: # Specify the covariance function. cov_func = pm.gp.cov.ExpQuad(1, ls=0.1) # Specify the GP. The default mean function is `Zero`. gp = pm.gp.Marginal(cov_func=cov_func) # Place a GP prior over the function f. sigma = pm.HalfCauchy("sigma", beta=3) y_ = gp.marginal_likelihood("y", X=X, y=y, sigma=sigma) ... # After fitting or sampling, specify the distribution # at new points with .conditional Xnew = np.linspace(-1, 2, 50)[:, None] with model: fcond = gp.conditional("fcond", Xnew=Xnew) """ def _build_marginal_likelihood(self, X, noise_func, jitter): mu = self.mean_func(X) Kxx = self.cov_func(X) Knx = noise_func(X) cov = Kxx + Knx return mu, stabilize(cov, jitter)
[docs] def marginal_likelihood( self, name, X, y, sigma=None, noise=None, jitter=JITTER_DEFAULT, is_observed=True, **kwargs, ): R""" Return the marginal likelihood distribution, given the input locations `X` and the data `y`. This is the integral over the product of the GP prior and a normal likelihood. .. math:: y \mid X,\theta \sim \int p(y \mid f,\, X,\, \theta) \, p(f \mid X,\, \theta) \, df Parameters ---------- name : str Name of the random variable X : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. y : array-like Data that is the sum of the function with the GP prior and Gaussian noise. Must have shape `(n, )`. sigma : float, Variable, or Covariance, default ~pymc.gp.cov.WhiteNoise Standard deviation of the Gaussian noise. Can also be a Covariance for non-white noise. noise : float, Variable, or Covariance, optional Deprecated. Previous parameterization of `sigma`. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. is_observed : bool, default True Deprecated. Whether to set `y` as an `observed` variable in the `model`. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvNormal` distribution constructor. """ sigma = _handle_sigma_noise_parameters(sigma=sigma, noise=noise) noise_func = sigma if isinstance(sigma, BaseCovariance) else pm.gp.cov.WhiteNoise(sigma) mu, cov = self._build_marginal_likelihood(X=X, noise_func=noise_func, jitter=jitter) self.X = X self.y = y self.sigma = noise_func if is_observed: return pm.MvNormal(name, mu=mu, cov=cov, observed=y, **kwargs) else: warnings.warn( "The 'is_observed' argument has been deprecated. If the GP is " "unobserved use gp.Latent instead.", FutureWarning, ) return pm.MvNormal(name, mu=mu, cov=cov, **kwargs)
def _get_given_vals(self, given): if given is None: given = {} if "gp" in given: cov_total = given["gp"].cov_func mean_total = given["gp"].mean_func else: cov_total = self.cov_func mean_total = self.mean_func if "noise" in given: warnings.warn(_noise_deprecation_warning, FutureWarning) given["sigma"] = given["noise"] if all(val in given for val in ["X", "y", "sigma"]): X, y, sigma = given["X"], given["y"], given["sigma"] noise_func = sigma if isinstance(sigma, BaseCovariance) else pm.gp.cov.WhiteNoise(sigma) else: X, y, noise_func = self.X, self.y, self.sigma return X, y, noise_func, cov_total, mean_total def _build_conditional( self, Xnew, pred_noise, diag, X, y, noise_func, cov_total, mean_total, jitter ): Kxx = cov_total(X) Kxs = self.cov_func(X, Xnew) Knx = noise_func(X) rxx = y - mean_total(X) L = cholesky(stabilize(Kxx, jitter) + Knx) A = solve_lower(L, Kxs) v = solve_lower(L, rxx.T) mu = self.mean_func(Xnew) + pt.dot(pt.transpose(A), v).T if diag: Kss = self.cov_func(Xnew, diag=True) var = Kss - pt.sum(pt.square(A), 0) if pred_noise: var += noise_func(Xnew, diag=True) return mu, var else: Kss = self.cov_func(Xnew) cov = Kss - pt.dot(pt.transpose(A), A) if pred_noise: cov += noise_func(Xnew) return mu, cov if pred_noise else stabilize(cov, jitter)
[docs] def conditional( self, name, Xnew, pred_noise=False, given=None, jitter=JITTER_DEFAULT, **kwargs ): R""" Return the conditional distribution evaluated over new input locations `Xnew`. Given a set of function values `f` that the GP prior was over, the conditional distribution over a set of new points, `f_*` is: .. math:: f_* \mid f, X, X_* \sim \mathcal{GP}\left( K(X_*, X) [K(X, X) + K_{n}(X, X)]^{-1} f \,, K(X_*, X_*) - K(X_*, X) [K(X, X) + K_{n}(X, X)]^{-1} K(X, X_*) \right) Parameters ---------- name : str Name of the random variable Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. pred_noise : bool, default False Whether or not observation noise is included in the conditional. given : dict, optional Can take key value pairs: `X`, `y`, `sigma`, and `gp`. See the :ref:`section <additive_gp>` in the documentation on additive GP models in pymc for more information. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvNormal` distribution constructor. """ givens = self._get_given_vals(given) mu, cov = self._build_conditional(Xnew, pred_noise, False, *givens, jitter) return pm.MvNormal(name, mu=mu, cov=cov, **kwargs)
[docs] def predict( self, Xnew, point=None, diag=False, pred_noise=False, given=None, jitter=JITTER_DEFAULT, model=None, ): R""" Return mean and covariance of the conditional distribution given a `point`. The `point` might be the MAP estimate or a sample from a trace. Parameters ---------- Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. point : pymc.Point, optional A specific point to condition on. diag : bool, default False If `True`, return the diagonal instead of the full covariance matrix. pred_noise : bool, default False Whether or not observation noise is included in the conditional. given : dict, optional Can take key value pairs: `X`, `y`, `sigma`, and `gp`. See the :ref:`section <additive_gp>` in the documentation on additive GP models in pymc for more information. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. model : Model, optional Model with the Gaussian Process component for which predictions will be generated. It is optional when inside a with context, otherwise it is required. """ if given is None: given = {} mu, cov = self._predict_at(Xnew, diag, pred_noise, given, jitter) return replace_with_values([mu, cov], replacements=point, model=model)
def _predict_at(self, Xnew, diag=False, pred_noise=False, given=None, jitter=JITTER_DEFAULT): R""" Return symbolic mean and covariance of the conditional distribution. Parameters ---------- Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. diag : bool, default False If `True`, return the diagonal instead of the full covariance matrix. pred_noise : bool, default False Whether or not observation noise is included in the conditional. given : dict, optional Can take key value pairs: `X`, `y`, `sigma`, and `gp`. See the :ref:`section <additive_gp>` in the documentation on additive GP models in pymc for more information. """ givens = self._get_given_vals(given) mu, cov = self._build_conditional(Xnew, pred_noise, diag, *givens, jitter) return mu, cov
[docs] @conditioned_vars(["X", "Xu", "y", "sigma"]) class MarginalApprox(Marginal): R""" Approximate marginal Gaussian process. The `gp.MarginalApprox` class is an implementation of the sum of a GP prior and additive noise. It has `marginal_likelihood`, `conditional` and `predict` methods. This GP implementation can be used to implement regression on data that is normally distributed. The available approximations are: - DTC: Deterministic Training Conditional - FITC: Fully independent Training Conditional - VFE: Variational Free Energy Parameters ---------- mean_func : Mean, default ~pymc.gp.mean.Zero The mean function. cov_func : 2D array-like, or Covariance, default ~pymc.gp.cov.Constant The covariance function. approx : str, default 'VFE' The approximation to use. Must be one of `VFE`, `FITC` or `DTC`. Examples -------- .. code:: python # A one dimensional column vector of inputs. X = np.linspace(0, 1, 10)[:, None] # A smaller set of inducing inputs Xu = np.linspace(0, 1, 5)[:, None] with pm.Model() as model: # Specify the covariance function. cov_func = pm.gp.cov.ExpQuad(1, ls=0.1) # Specify the GP. The default mean function is `Zero`. gp = pm.gp.MarginalApprox(cov_func=cov_func, approx="FITC") # Place a GP prior over the function f. sigma = pm.HalfCauchy("sigma", beta=3) y_ = gp.marginal_likelihood("y", X=X, Xu=Xu, y=y, sigma=sigma) ... # After fitting or sampling, specify the distribution # at new points with .conditional Xnew = np.linspace(-1, 2, 50)[:, None] with model: fcond = gp.conditional("fcond", Xnew=Xnew) References ---------- - Quinonero-Candela, J., and Rasmussen, C. (2005). A Unifying View of Sparse Approximate Gaussian Process Regression. - Titsias, M. (2009). Variational Learning of Inducing Variables in Sparse Gaussian Processes. - Bauer, M., van der Wilk, M., and Rasmussen, C. E. (2016). Understanding Probabilistic Sparse Gaussian Process Approximations. """ _available_approx = ("FITC", "VFE", "DTC")
[docs] def __init__(self, approx="VFE", *, mean_func=Zero(), cov_func=Constant(0.0)): if approx not in self._available_approx: raise NotImplementedError(approx) self.approx = approx super().__init__(mean_func=mean_func, cov_func=cov_func)
def __add__(self, other): """Add two Gaussian processes.""" new_gp = super().__add__(other) if not self.approx == other.approx: raise TypeError("Cannot add GPs with different approximations") new_gp.approx = self.approx return new_gp def _build_marginal_likelihood_loglik(self, y, X, Xu, sigma, jitter): sigma2 = pt.square(sigma) Kuu = self.cov_func(Xu) Kuf = self.cov_func(Xu, X) Luu = cholesky(stabilize(Kuu, jitter)) A = solve_lower(Luu, Kuf) Qffd = pt.sum(A * A, 0) if self.approx == "FITC": Kffd = self.cov_func(X, diag=True) Lamd = pt.clip(Kffd - Qffd, 0.0, np.inf) + sigma2 trace = 0.0 elif self.approx == "VFE": Lamd = pt.ones_like(Qffd) * sigma2 trace = (1.0 / (2.0 * sigma2)) * ( pt.sum(self.cov_func(X, diag=True)) - pt.sum(pt.sum(A * A, 0)) ) else: # DTC Lamd = pt.ones_like(Qffd) * sigma2 trace = 0.0 A_l = A / Lamd L_B = cholesky(pt.eye(Xu.shape[0]) + pt.dot(A_l, pt.transpose(A))) r = y - self.mean_func(X) r_l = r / Lamd c = solve_lower(L_B, pt.dot(A, r_l)) constant = 0.5 * X.shape[0] * pt.log(2.0 * np.pi) logdet = 0.5 * pt.sum(pt.log(Lamd)) + pt.sum(pt.log(pt.diag(L_B))) quadratic = 0.5 * (pt.dot(r, r_l) - pt.dot(c, c)) return -1.0 * (constant + logdet + quadratic + trace)
[docs] def marginal_likelihood( self, name, X, Xu, y, sigma=None, noise=None, jitter=JITTER_DEFAULT, **kwargs ): R""" Return the approximate marginal likelihood distribution. This is given the input locations `X`, inducing point locations `Xu`, data `y`, and white noise standard deviations `sigma`. Parameters ---------- name : str Name of the random variable X : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. Xu : array-like The inducing points. Must have the same number of columns as `X`. y : array-like Data that is the sum of the function with the GP prior and Gaussian noise. Must have shape `(n, )`. sigma : float, Variable Standard deviation of the Gaussian noise. noise : float, Variable, optional Previous parameterization of `sigma`. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvNormal` distribution constructor. """ self.X = X self.Xu = Xu self.y = y self.sigma = _handle_sigma_noise_parameters(sigma=sigma, noise=noise) approx_loglik = self._build_marginal_likelihood_loglik( y=self.y, X=self.X, Xu=self.Xu, sigma=self.sigma, jitter=jitter ) pm.Potential(f"marginalapprox_loglik_{name}", approx_loglik, **kwargs)
def _build_conditional( self, Xnew, pred_noise, diag, X, Xu, y, sigma, cov_total, mean_total, jitter ): sigma2 = pt.square(sigma) Kuu = cov_total(Xu) Kuf = cov_total(Xu, X) Luu = cholesky(stabilize(Kuu, jitter)) A = solve_lower(Luu, Kuf) Qffd = pt.sum(A * A, 0) if self.approx == "FITC": Kffd = cov_total(X, diag=True) Lamd = pt.clip(Kffd - Qffd, 0.0, np.inf) + sigma2 else: # VFE or DTC Lamd = pt.ones_like(Qffd) * sigma2 A_l = A / Lamd L_B = cholesky(pt.eye(Xu.shape[0]) + pt.dot(A_l, pt.transpose(A))) r = y - mean_total(X) r_l = r / Lamd c = solve_lower(L_B, pt.dot(A, r_l)) Kus = self.cov_func(Xu, Xnew) As = solve_lower(Luu, Kus) mu = self.mean_func(Xnew) + pt.dot(pt.transpose(As), solve_upper(pt.transpose(L_B), c)) C = solve_lower(L_B, As) if diag: Kss = self.cov_func(Xnew, diag=True) var = Kss - pt.sum(pt.square(As), 0) + pt.sum(pt.square(C), 0) if pred_noise: var += sigma2 return mu, var else: cov = self.cov_func(Xnew) - pt.dot(pt.transpose(As), As) + pt.dot(pt.transpose(C), C) if pred_noise: cov += sigma2 * pt.identity_like(cov) return mu, cov if pred_noise else stabilize(cov, jitter) def _get_given_vals(self, given): if given is None: given = {} if "gp" in given: cov_total = given["gp"].cov_func mean_total = given["gp"].mean_func else: cov_total = self.cov_func mean_total = self.mean_func if all(val in given for val in ["X", "Xu", "y", "sigma"]): X, Xu, y, sigma = given["X"], given["Xu"], given["y"], given["sigma"] else: X, Xu, y, sigma = self.X, self.Xu, self.y, self.sigma return X, Xu, y, sigma, cov_total, mean_total
[docs] def conditional( self, name, Xnew, pred_noise=False, given=None, jitter=JITTER_DEFAULT, **kwargs ): R""" Return the approximate conditional distribution of the GP evaluated over new input locations `Xnew`. Parameters ---------- name : str Name of the random variable Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. pred_noise : bool, default False Whether or not observation noise is included in the conditional. given : dict, optional Can take key value pairs: `X`, `Xu`, `y`, `sigma`, and `gp`. See the :ref:`section <additive_gp>` in the documentation on additive GP models in pymc for more information. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvNormal` distribution constructor. """ givens = self._get_given_vals(given) mu, cov = self._build_conditional(Xnew, pred_noise, False, *givens, jitter) return pm.MvNormal(name, mu=mu, cov=cov, **kwargs)
@conditioned_vars(["X", "Xu", "y", "sigma"]) class MarginalSparse(MarginalApprox): def __init__(self, approx="VFE", *, mean_func=Zero(), cov_func=Constant(0.0)): warnings.warn( "gp.MarginalSparse has been renamed to gp.MarginalApprox.", FutureWarning, ) super().__init__(mean_func=mean_func, cov_func=cov_func, approx=approx)
[docs] @conditioned_vars(["Xs", "f"]) class LatentKron(Base): R""" Latent Gaussian process whose covariance is a tensor product kernel. The `gp.LatentKron` class is a direct implementation of a GP with a Kronecker structured covariance, without reference to any noise or specific likelihood. The GP is constructed with the `prior` method, and the conditional GP over new input locations is constructed with the `conditional` method. For more information on these methods, see their docstrings. This GP implementation can be used to model a Gaussian process whose inputs cover evenly spaced grids on more than one dimension. `LatentKron` relies on the `KroneckerNormal` distribution, see its docstring for more information. Parameters ---------- mean_func : Mean, default ~pymc.gp.mean.Zero The mean function. cov_funcs : list of Covariance, default [~pymc.gp.cov.Constant] The covariance functions that compose the tensor (Kronecker) product. Examples -------- .. code:: python # One dimensional column vectors of inputs X1 = np.linspace(0, 1, 10)[:, None] X2 = np.linspace(0, 2, 5)[:, None] Xs = [X1, X2] with pm.Model() as model: # Specify the covariance functions for each Xi cov_func1 = pm.gp.cov.ExpQuad(1, ls=0.1) # Must accept X1 without error cov_func2 = pm.gp.cov.ExpQuad(1, ls=0.3) # Must accept X2 without error # Specify the GP. The default mean function is `Zero`. gp = pm.gp.LatentKron(cov_funcs=[cov_func1, cov_func2]) # ... # After fitting or sampling, specify the distribution # at new points with .conditional # Xnew need not be on a full grid Xnew1 = np.linspace(-1, 2, 10)[:, None] Xnew2 = np.linspace(0, 3, 10)[:, None] Xnew = np.concatenate((Xnew1, Xnew2), axis=1) # Not full grid, works Xnew = pm.math.cartesian(Xnew1, Xnew2) # Full grid, also works with model: fcond = gp.conditional("fcond", Xnew=Xnew) """
[docs] def __init__(self, *, mean_func=Zero(), cov_funcs=(Constant(0.0))): try: self.cov_funcs = list(cov_funcs) except TypeError: self.cov_funcs = [cov_funcs] cov_func = pm.gp.cov.Kron(self.cov_funcs) super().__init__(mean_func=mean_func, cov_func=cov_func)
def __add__(self, other): """Add two Gaussian processes.""" raise TypeError("Additive, Kronecker-structured processes not implemented") def _build_prior(self, name, Xs, jitter, **kwargs): self.N = int(np.prod([len(X) for X in Xs])) mu = self.mean_func(cartesian(*Xs)) chols = [cholesky(stabilize(cov(X), jitter)) for cov, X in zip(self.cov_funcs, Xs)] v = pm.Normal(name + "_rotated_", mu=0.0, sigma=1.0, size=self.N, **kwargs) f = pm.Deterministic(name, mu + pt.flatten(kron_dot(chols, v))) return f
[docs] def prior(self, name, Xs, jitter=JITTER_DEFAULT, **kwargs): """ Return the prior distribution evaluated over the input locations `Xs`. Parameters ---------- name : str Name of the random variable Xs : list of array-like Function input values for each covariance function. Each entry must be passable to its respective covariance without error. The total covariance function is measured on the full grid `cartesian(*Xs)`. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to the :class:`~pymc.KroneckerNormal` distribution constructor. """ if len(Xs) != len(self.cov_funcs): raise ValueError("Must provide a covariance function for each X") f = self._build_prior(name, Xs, jitter, **kwargs) self.Xs = Xs self.f = f return f
def _build_conditional(self, Xnew, jitter): Xs, f = self.Xs, self.f X = cartesian(*Xs) delta = f - self.mean_func(X) covs = [stabilize(cov(Xi), jitter) for cov, Xi in zip(self.cov_funcs, Xs)] chols = [cholesky(cov) for cov in covs] cholTs = [pt.transpose(chol) for chol in chols] Kss = self.cov_func(Xnew) Kxs = self.cov_func(X, Xnew) Ksx = pt.transpose(Kxs) alpha = kron_solve_lower(chols, delta) alpha = kron_solve_upper(cholTs, alpha) mu = pt.dot(Ksx, alpha).ravel() + self.mean_func(Xnew) A = kron_solve_lower(chols, Kxs) cov = stabilize(Kss - pt.dot(pt.transpose(A), A), jitter) return mu, cov
[docs] def conditional(self, name, Xnew, jitter=JITTER_DEFAULT, **kwargs): """ Return the conditional distribution evaluated over new input locations `Xnew`. `Xnew` will be split by columns and fed to the relevant covariance functions based on their `input_dim`. For example, if `cov_func1`, `cov_func2`, and `cov_func3` have `input_dim` of 2, 1, and 4, respectively, then `Xnew` must have 7 columns and a covariance between the prediction points .. code:: python cov_func(Xnew) = cov_func1(Xnew[:, :2]) * cov_func1(Xnew[:, 2:3]) * cov_func1(Xnew[:, 3:]) The distribution returned by `conditional` does not have a Kronecker structure regardless of whether the input points lie on a full grid. Therefore, `Xnew` does not need to have grid structure. Parameters ---------- name : str Name of the random variable Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. jitter : float, default 1e-6 A small correction added to the diagonal of positive semi-definite covariance matrices to ensure numerical stability. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvNormal` distribution constructor. """ mu, cov = self._build_conditional(Xnew, jitter) return pm.MvNormal(name, mu=mu, cov=cov, **kwargs)
[docs] @conditioned_vars(["Xs", "y", "sigma"]) class MarginalKron(Base): R""" Marginal Gaussian process whose covariance is a tensor product kernel. The `gp.MarginalKron` class is an implementation of the sum of a Kronecker GP prior and additive white noise. It has `marginal_likelihood`, `conditional` and `predict` methods. This GP implementation can be used to efficiently implement regression on data that are normally distributed with a tensor product kernel and are measured on a full grid of inputs: `cartesian(*Xs)`. `MarginalKron` is based on the `KroneckerNormal` distribution, see its docstring for more information. For more information on the `marginal_likelihood`, `conditional` and `predict` methods, see their docstrings. Parameters ---------- mean_func : Mean, default ~pymc.gp.mean.Zero The mean function. cov_funcs : list of Covariance, default [~pymc.gp.cov.Constant] The covariance functions that compose the tensor (Kronecker) product. Examples -------- .. code:: python # One dimensional column vectors of inputs X1 = np.linspace(0, 1, 10)[:, None] X2 = np.linspace(0, 2, 5)[:, None] Xs = [X1, X2] y = np.random.randn(len(X1)*len(X2)) # toy data with pm.Model() as model: # Specify the covariance functions for each Xi cov_func1 = pm.gp.cov.ExpQuad(1, ls=0.1) # Must accept X1 without error cov_func2 = pm.gp.cov.ExpQuad(1, ls=0.3) # Must accept X2 without error # Specify the GP. The default mean function is `Zero`. gp = pm.gp.MarginalKron(cov_funcs=[cov_func1, cov_func2]) # Place a GP prior over the function f. sigma = pm.HalfCauchy("sigma", beta=3) y_ = gp.marginal_likelihood("y", Xs=Xs, y=y, sigma=sigma) # ... # After fitting or sampling, specify the distribution # at new points with .conditional # Xnew need not be on a full grid Xnew1 = np.linspace(-1, 2, 10)[:, None] Xnew2 = np.linspace(0, 3, 10)[:, None] Xnew = np.concatenate((Xnew1, Xnew2), axis=1) # Not full grid, works Xnew = pm.math.cartesian(Xnew1, Xnew2) # Full grid, also works with model: fcond = gp.conditional("fcond", Xnew=Xnew) """
[docs] def __init__(self, *, mean_func=Zero(), cov_funcs=(Constant(0.0))): try: self.cov_funcs = list(cov_funcs) except TypeError: self.cov_funcs = [cov_funcs] cov_func = pm.gp.cov.Kron(self.cov_funcs) super().__init__(mean_func=mean_func, cov_func=cov_func)
def __add__(self, other): """Add two Gaussian processes.""" raise TypeError("Additive, Kronecker-structured processes not implemented") def _build_marginal_likelihood(self, Xs): self.X = cartesian(*Xs) mu = self.mean_func(self.X) covs = [f(X) for f, X in zip(self.cov_funcs, Xs)] return mu, covs def _check_inputs(self, Xs, y): N = int(np.prod([len(X) for X in Xs])) if len(Xs) != len(self.cov_funcs): raise ValueError("Must provide a covariance function for each X") if N != len(y): raise ValueError( f"Length of y ({len(y)}) must match length of cartesian product of Xs ({N})" )
[docs] def marginal_likelihood(self, name, Xs, y, sigma, is_observed=True, **kwargs): """ Return the marginal likelihood distribution, given the input locations `cartesian(*Xs)` and the data `y`. Parameters ---------- name : str Name of the random variable Xs : list of array-like Function input values for each covariance function. Each entry must be passable to its respective covariance without error. The total covariance function is measured on the full grid `cartesian(*Xs)`. y : array-like Data that is the sum of the function with the GP prior and Gaussian noise. Must have shape `(n, )`. sigma : float, Variable Standard deviation of the white Gaussian noise. is_observed : bool, default True Deprecated. Whether to set `y` as an `observed` variable in the `model`. **kwargs Extra keyword arguments that are passed to :class:`~pymc.KroneckerNormal` distribution constructor. """ self._check_inputs(Xs, y) mu, covs = self._build_marginal_likelihood(Xs) self.Xs = Xs self.y = y self.sigma = sigma if is_observed: return pm.KroneckerNormal(name, mu=mu, covs=covs, sigma=sigma, observed=y, **kwargs) else: warnings.warn( "The 'is_observed' argument has been deprecated. If the GP is " "unobserved use gp.LatentKron instead.", FutureWarning, ) size = int(np.prod([len(X) for X in Xs])) return pm.KroneckerNormal(name, mu=mu, covs=covs, sigma=sigma, size=size, **kwargs)
def _build_conditional(self, Xnew, diag, pred_noise): Xs, y, sigma = self.Xs, self.y, self.sigma # Old points X = cartesian(*Xs) delta = y - self.mean_func(X) Kns = [f(x) for f, x in zip(self.cov_funcs, Xs)] eigs_sep, Qs = zip(*map(eigh, Kns)) # Unzip QTs = list(map(pt.transpose, Qs)) eigs = kron_diag(*eigs_sep) # Combine separate eigs if sigma is not None: eigs += sigma**2 Km = self.cov_func(Xnew, diag=diag) Knm = self.cov_func(X, Xnew) Kmn = Knm.T # Build conditional mu alpha = kron_dot(QTs, delta) alpha = alpha / eigs[:, None] alpha = kron_dot(Qs, alpha) mu = pt.dot(Kmn, alpha).ravel() + self.mean_func(Xnew) # Build conditional cov A = kron_dot(QTs, Knm) A = A / pt.sqrt(eigs[:, None]) if diag: Asq = pt.sum(pt.square(A), 0) cov = Km - Asq if pred_noise: cov += sigma else: Asq = pt.dot(A.T, A) cov = Km - Asq if pred_noise: cov += sigma * pt.identity_like(cov) return mu, cov
[docs] def conditional(self, name, Xnew, pred_noise=False, diag=False, **kwargs): """ Return the conditional distribution evaluated over new input locations `Xnew`, just as in `Marginal`. `Xnew` will be split by columns and fed to the relevant covariance functions based on their `input_dim`. For example, if `cov_func1`, `cov_func2`, and `cov_func3` have `input_dim` of 2, 1, and 4, respectively, then `Xnew` must have 7 columns and a covariance between the prediction points .. code:: python cov_func(Xnew) = cov_func1(Xnew[:, :2]) * cov_func1(Xnew[:, 2:3]) * cov_func1(Xnew[:, 3:]) The distribution returned by `conditional` does not have a Kronecker structure regardless of whether the input points lie on a full grid. Therefore, `Xnew` does not need to have grid structure. Parameters ---------- name : str Name of the random variable Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. pred_noise : bool, default False Whether or not observation noise is included in the conditional. **kwargs Extra keyword arguments that are passed to :class:`~pymc.MvNormal` distribution constructor. """ mu, cov = self._build_conditional(Xnew, diag, pred_noise) return pm.MvNormal(name, mu=mu, cov=cov, **kwargs)
[docs] def predict(self, Xnew, point=None, diag=False, pred_noise=False, model=None): R""" Return mean and covariance of the conditional distribution given a `point`. The `point` might be the MAP estimate or a sample from a trace. Parameters ---------- Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. point : pymc.Point, optional A specific point to condition on. diag : bool, default False If `True`, return the diagonal instead of the full covariance matrix. pred_noise : bool, default False Whether or not observation noise is included in the conditional. model : Model, optional Model with the Gaussian Process component for which predictions will be generated. It is optional when inside a with context, otherwise it is required. """ mu, cov = self._predict_at(Xnew, diag, pred_noise) return replace_with_values([mu, cov], replacements=point, model=model)
def _predict_at(self, Xnew, diag=False, pred_noise=False): R""" Return symbolic mean and covariance of the conditional distribution. Parameters ---------- Xnew : array-like Function input values. If one-dimensional, must be a column vector with shape `(n, 1)`. diag : bool, default False If `True`, return the diagonal instead of the full covariance matrix. pred_noise : bool, default False Whether or not observation noise is included in the conditional. """ mu, cov = self._build_conditional(Xnew, diag, pred_noise) return mu, cov