# Copyright 2023 The PyMC Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numbers
import warnings
from collections import Counter
from functools import reduce
from operator import add, mul
from typing import Optional, Sequence
import numpy as np
import pytensor.tensor as pt
from pytensor.graph.basic import Variable
from pytensor.tensor.sharedvar import TensorSharedVariable
from pytensor.tensor.var import TensorConstant, TensorVariable
__all__ = [
"Constant",
"WhiteNoise",
"ExpQuad",
"RatQuad",
"Exponential",
"Matern52",
"Matern32",
"Matern12",
"Linear",
"Polynomial",
"Cosine",
"Periodic",
"WarpedInput",
"Gibbs",
"Coregion",
"ScaledCov",
"Kron",
]
class BaseCovariance:
"""
Base class for kernels/covariance functions.
"""
def __call__(self, X, Xs=None, diag=False):
r"""
Evaluate the kernel/covariance function.
Parameters
----------
X: The training inputs to the kernel.
Xs: The optional prediction set of inputs the kernel.
If Xs is None, Xs = X.
diag: bool
Return only the diagonal of the covariance function.
Default is False.
"""
if diag:
return self.diag(X)
else:
return self.full(X, Xs)
def diag(self, X):
raise NotImplementedError
def full(self, X, Xs=None):
raise NotImplementedError
def __add__(self, other):
# If it's a scalar, cast as Constant covariance. This allows validation for power spectral
# density calc.
if isinstance(other, numbers.Real):
other = Constant(c=other)
return Add([self, other])
def __mul__(self, other):
return Prod([self, other])
def __radd__(self, other):
return self.__add__(other)
def __rmul__(self, other):
return self.__mul__(other)
def __pow__(self, other):
other = pt.as_tensor_variable(other).squeeze()
if not other.ndim == 0:
raise ValueError("A covariance function can only be exponentiated by a scalar value")
return Exponentiated(self, other)
def __array_wrap__(self, result):
"""
Required to allow radd/rmul by numpy arrays.
"""
result = np.squeeze(result)
if len(result.shape) <= 1:
result = result.reshape(1, 1)
elif len(result.shape) > 2:
raise ValueError(
f"cannot combine a covariance function with array of shape {result.shape}"
)
r, c = result.shape
A = np.zeros((r, c))
for i in range(r):
for j in range(c):
r = result[i, j]._factor_list[1]
if isinstance(r, Constant):
# Counteract the elemwise Add edgecase
r = r.c
A[i, j] = r
if isinstance(result[0][0], Add):
return result[0][0]._factor_list[0] + A
elif isinstance(result[0][0], Prod):
return result[0][0]._factor_list[0] * A
else:
raise TypeError(
f"Unknown Covariance combination type {result[0][0]}. "
"Known types are `Add` or `Prod`."
)
class Covariance(BaseCovariance):
"""
Base class for kernels/covariance functions with input_dim and active_dims, which excludes
kernels like `Constant` and `WhiteNoise`.
Parameters
----------
input_dim: integer
The number of input dimensions, or columns of X (or Xs)
the kernel will operate on.
active_dims: List of integers
Indicate which dimension or column of X the covariance
function operates on.
"""
def __init__(self, input_dim: int, active_dims: Optional[Sequence[int]] = None):
self.input_dim = input_dim
if active_dims is None:
self.active_dims = np.arange(input_dim)
else:
self.active_dims = np.asarray(active_dims, int)
if max(self.active_dims) > self.input_dim:
raise ValueError("Values in `active_dims` can't be larger than `input_dim`.")
@property
def n_dims(self):
"""The dimensionality of the input, as taken from the
`active_dims`.
"""
# Evaluate lazily in-case this changes.
return len(self.active_dims)
def _slice(self, X, Xs=None):
xdims = X.shape[-1]
if isinstance(xdims, Variable):
xdims = xdims.eval()
if self.input_dim != xdims:
warnings.warn(
f"Only {self.input_dim} column(s) out of {xdims} are"
" being used to compute the covariance function. If this"
" is not intended, increase 'input_dim' parameter to"
" the number of columns to use. Ignore otherwise.",
UserWarning,
)
X = pt.as_tensor_variable(X[:, self.active_dims])
if Xs is not None:
Xs = pt.as_tensor_variable(Xs[:, self.active_dims])
return X, Xs
class Combination(Covariance):
def __init__(self, factor_list):
"""Use constituent factors to get input_dim and active_dims for the Combination covariance."""
# Check if all input_dim are the same in factor_list
input_dims = {factor.input_dim for factor in factor_list if isinstance(factor, Covariance)}
if len(input_dims) != 1:
raise ValueError("All covariances must have the same `input_dim`.")
input_dim = input_dims.pop()
# Union all active_dims sets in factor_list for the combination covariance
active_dims = np.sort(
np.asarray(
list(
set.union(
*[
set(factor.active_dims)
for factor in factor_list
if isinstance(factor, Covariance)
]
)
),
dtype=int,
)
)
super().__init__(input_dim=input_dim, active_dims=active_dims)
# Set up combination kernel, flatten out factor_list so that
self._factor_list = []
for factor in factor_list:
if isinstance(factor, self.__class__):
self._factor_list.extend(factor._factor_list)
else:
self._factor_list.append(factor)
def _merge_factors_cov(self, X, Xs=None, diag=False):
"""Called to evaluate either all the sums or all the
products of kernels that are possible to evaluate.
"""
factor_list = []
for factor in self._factor_list:
# make sure diag=True is handled properly
if isinstance(factor, BaseCovariance):
factor_list.append(factor(X, Xs, diag))
elif isinstance(factor, np.ndarray):
if np.ndim(factor) == 2 and diag:
factor_list.append(np.diag(factor))
else:
factor_list.append(factor)
elif isinstance(
factor,
(
TensorConstant,
TensorVariable,
TensorSharedVariable,
),
):
if factor.ndim == 2 and diag:
factor_list.append(pt.diag(factor))
else:
factor_list.append(factor)
else:
factor_list.append(factor)
return factor_list
def _merge_factors_psd(self, omega):
"""Called to evaluatate spectral densities of combination kernels when possible.
Implements
a more restricted set of rules than `_merge_factors_cov` -- just additivity of stationary
covariances with defined power spectral densities and multiplication by scalars. Also, the
active_dims for all covariances in the sum must be the same.
"""
factor_list = []
for factor in self._factor_list:
if isinstance(factor, Covariance):
# Allow merging covariances for psd only if active_dims are the same
if set(self.active_dims) != set(factor.active_dims):
raise ValueError(
"For power spectral density calculations `active_dims` must be the same "
"for all covariances in the sum."
)
# If it's a covariance try to calculate the psd
try:
factor_list.append(factor.power_spectral_density(omega))
except (AttributeError, NotImplementedError) as e:
if isinstance(factor, Stationary):
raise NotImplementedError(
f"No power spectral density method has been implemented for {factor}."
) from e
else:
raise ValueError(
"Power spectral densities, `.power_spectral_density(omega)`, can only "
f"be calculated for `Stationary` covariance functions. {factor} is "
"non-stationary."
) from e
else:
# Otherwise defer the reduction to later
factor_list.append(factor)
return factor_list
class Add(Combination):
def __call__(self, X, Xs=None, diag=False):
return reduce(add, self._merge_factors_cov(X, Xs, diag))
def power_spectral_density(self, omega):
return reduce(add, self._merge_factors_psd(omega))
class Prod(Combination):
def __call__(self, X, Xs=None, diag=False):
return reduce(mul, self._merge_factors_cov(X, Xs, diag))
def power_spectral_density(self, omega):
check = Counter([isinstance(factor, Covariance) for factor in self._factor_list])
if check.get(True) >= 2:
raise NotImplementedError(
"The power spectral density of products of covariance "
"functions is not implemented."
)
return reduce(mul, self._merge_factors_psd(omega))
class Exponentiated(Covariance):
def __init__(self, kernel, power):
self.kernel = kernel
self.power = power
super().__init__(input_dim=self.kernel.input_dim, active_dims=self.kernel.active_dims)
def __call__(self, X, Xs=None, diag=False):
return self.kernel(X, Xs, diag=diag) ** self.power
[docs]class Kron(Covariance):
r"""Form a covariance object that is the kronecker product of other covariances.
In contrast to standard multiplication, where each covariance is given the
same inputs X and Xs, kronecker product covariances first split the inputs
into their respective spaces (inferred from the input_dim of each object)
before forming their product. Kronecker covariances have a larger
input dimension than any of its factors since the inputs are the
concatenated columns of its components.
Factors must be covariances or their combinations, arrays will not work.
Generally utilized by the `gp.MarginalKron` and gp.LatentKron`
implementations.
"""
[docs] def __init__(self, factor_list):
self.input_dims = [factor.input_dim for factor in factor_list]
input_dim = sum(self.input_dims)
super().__init__(input_dim=input_dim)
self._factor_list = factor_list
def _split(self, X, Xs):
indices = np.cumsum(self.input_dims)
X_split = np.hsplit(X, indices)
if Xs is not None:
Xs_split = np.hsplit(Xs, indices)
else:
Xs_split = [None] * len(X_split)
return X_split, Xs_split
def __call__(self, X, Xs=None, diag=False):
X_split, Xs_split = self._split(X, Xs)
covs = [cov(x, xs, diag) for cov, x, xs in zip(self._factor_list, X_split, Xs_split)]
return reduce(mul, covs)
[docs]class Constant(BaseCovariance):
r"""
Constant valued covariance function.
.. math::
k(x, x') = c
"""
[docs] def __init__(self, c):
self.c = c
[docs] def diag(self, X):
return pt.alloc(self.c, X.shape[0])
[docs] def full(self, X, Xs=None):
if Xs is None:
return pt.alloc(self.c, X.shape[0], X.shape[0])
else:
return pt.alloc(self.c, X.shape[0], Xs.shape[0])
[docs]class WhiteNoise(BaseCovariance):
r"""
White noise covariance function.
.. math::
k(x, x') = \sigma^2 \mathrm{I}
"""
[docs] def __init__(self, sigma):
self.sigma = sigma
[docs] def diag(self, X):
return pt.alloc(pt.square(self.sigma), X.shape[0])
[docs] def full(self, X, Xs=None):
if Xs is None:
return pt.diag(self.diag(X))
else:
return pt.alloc(0.0, X.shape[0], Xs.shape[0])
class Circular(Covariance):
R"""
Circular Kernel.
.. math::
k_g(x, y) = W_\pi(\operatorname{dist}_{\mathit{geo}}(x, y)),
with
.. math::
W_c(t) = \left(1 + \tau \frac{t}{c}\right)\left(1-\frac{t}{c}\right)^\tau_+
where :math:`c` is maximum value for :math:`t` and :math:`\tau\ge 4`.
:math:`\tau` controls for correlation strength, larger :math:`\tau` leads to less smooth functions
See [1]_ for more explanations and use cases.
Parameters
----------
period : scalar
defines the circular interval :math:`[0, \mathit{bound})`
tau : scalar
:math:`\tau\ge 4` defines correlation strength, the larger,
the smaller correlation is. Minimum value is :math:`4`
References
----------
.. [1] Espéran Padonou, O Roustant, "Polar Gaussian Processes for Predicting on Circular Domains"
https://hal.archives-ouvertes.fr/hal-01119942v1/document
"""
def __init__(self, input_dim, period, tau=4, active_dims=None):
super().__init__(input_dim, active_dims)
self.c = pt.as_tensor_variable(period / 2)
self.tau = tau
def dist(self, X, Xs):
if Xs is None:
Xs = pt.transpose(X)
else:
Xs = pt.transpose(Xs)
return pt.abs((X - Xs + self.c) % (self.c * 2) - self.c)
def weinland(self, t):
return (1 + self.tau * t / self.c) * pt.clip(1 - t / self.c, 0, np.inf) ** self.tau
def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
return self.weinland(self.dist(X, Xs))
def diag(self, X):
return pt.alloc(1.0, X.shape[0])
class Stationary(Covariance):
r"""
Base class for stationary kernels/covariance functions.
Parameters
----------
ls: Lengthscale. If input_dim > 1, a list or array of scalars or PyMC random
variables. If input_dim == 1, a scalar or PyMC random variable.
ls_inv: Inverse lengthscale. 1 / ls. One of ls or ls_inv must be provided.
"""
def __init__(self, input_dim, ls=None, ls_inv=None, active_dims=None):
super().__init__(input_dim, active_dims)
if (ls is None and ls_inv is None) or (ls is not None and ls_inv is not None):
raise ValueError("Only one of 'ls' or 'ls_inv' must be provided")
elif ls_inv is not None:
if isinstance(ls_inv, (list, tuple)):
ls = 1.0 / np.asarray(ls_inv)
else:
ls = 1.0 / ls_inv
self.ls = pt.as_tensor_variable(ls)
def square_dist(self, X, Xs):
X = pt.mul(X, 1.0 / self.ls)
X2 = pt.sum(pt.square(X), 1)
if Xs is None:
sqd = -2.0 * pt.dot(X, pt.transpose(X)) + (
pt.reshape(X2, (-1, 1)) + pt.reshape(X2, (1, -1))
)
else:
Xs = pt.mul(Xs, 1.0 / self.ls)
Xs2 = pt.sum(pt.square(Xs), 1)
sqd = -2.0 * pt.dot(X, pt.transpose(Xs)) + (
pt.reshape(X2, (-1, 1)) + pt.reshape(Xs2, (1, -1))
)
return pt.clip(sqd, 0.0, np.inf)
def euclidean_dist(self, X, Xs):
r2 = self.square_dist(X, Xs)
return pt.sqrt(r2 + 1e-12)
def diag(self, X):
return pt.alloc(1.0, X.shape[0])
def full(self, X, Xs=None):
raise NotImplementedError
def power_spectral_density(self, omega):
raise NotImplementedError
[docs]class Periodic(Stationary):
r"""
The Periodic kernel.
.. math::
k(x, x') = \mathrm{exp}\left( -\frac{\mathrm{sin}^2(\pi |x-x'| \frac{1}{T})}{2\ell^2} \right)
Notes
-----
Note that the scaling factor for this kernel is different compared to the more common
definition (see [1]_). Here, 0.5 is in the exponent instead of the more common value, 2.
Divide the length-scale by 2 when initializing the kernel to recover the standard definition.
References
----------
.. [1] David Duvenaud, "The Kernel Cookbook"
https://www.cs.toronto.edu/~duvenaud/cookbook/
"""
[docs] def __init__(self, input_dim, period, ls=None, ls_inv=None, active_dims=None):
super().__init__(input_dim, ls, ls_inv, active_dims)
self.period = period
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
if Xs is None:
Xs = X
f1 = X.dimshuffle(0, "x", 1)
f2 = Xs.dimshuffle("x", 0, 1)
r = np.pi * (f1 - f2) / self.period
r = pt.sum(pt.square(pt.sin(r) / self.ls), 2)
return pt.exp(-0.5 * r)
[docs]class ExpQuad(Stationary):
r"""
The Exponentiated Quadratic kernel. Also referred to as the Squared
Exponential, or Radial Basis Function kernel.
.. math::
k(x, x') = \mathrm{exp}\left[ -\frac{(x - x')^2}{2 \ell^2} \right]
"""
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
return pt.exp(-0.5 * self.square_dist(X, Xs))
[docs] def power_spectral_density(self, omega):
r"""
The power spectral density for the ExpQuad kernel is:
.. math::
S(\boldsymbol\omega) =
(\sqrt(2 \pi)^D \prod_{i}^{D}\ell_i
\exp\left( -\frac{1}{2} \sum_{i}^{D}\ell_i^2 \omega_i^{2} \right)
"""
ls = pt.ones(self.n_dims) * self.ls
c = pt.power(pt.sqrt(2.0 * np.pi), self.n_dims)
exp = pt.exp(-0.5 * pt.dot(pt.square(omega), pt.square(ls)))
return c * pt.prod(ls) * exp
[docs]class RatQuad(Stationary):
r"""
The Rational Quadratic kernel.
.. math::
k(x, x') = \left(1 + \frac{(x - x')^2}{2\alpha\ell^2} \right)^{-\alpha}
"""
[docs] def __init__(self, input_dim, alpha, ls=None, ls_inv=None, active_dims=None):
super().__init__(input_dim, ls, ls_inv, active_dims)
self.alpha = alpha
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
return pt.power(
(1.0 + 0.5 * self.square_dist(X, Xs) * (1.0 / self.alpha)),
-1.0 * self.alpha,
)
[docs]class Matern52(Stationary):
r"""
The Matern kernel with nu = 5/2.
.. math::
k(x, x') = \left(1 + \frac{\sqrt{5(x - x')^2}}{\ell} +
\frac{5(x-x')^2}{3\ell^2}\right)
\mathrm{exp}\left[ - \frac{\sqrt{5(x - x')^2}}{\ell} \right]
"""
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
r = self.euclidean_dist(X, Xs)
return (1.0 + np.sqrt(5.0) * r + 5.0 / 3.0 * pt.square(r)) * pt.exp(-1.0 * np.sqrt(5.0) * r)
[docs] def power_spectral_density(self, omega):
r"""
The power spectral density for the Matern52 kernel is:
.. math::
S(\boldsymbol\omega) =
\frac{2^D \pi^{\frac{D}{2}} \Gamma(\frac{D+5}{2}) 5^{5/2}}
{\frac{3}{4}\sqrt{\pi}}
\prod_{i=1}^{D}\ell_{i}
\left(5 + \sum_{i=1}^{D}\ell_{i}^2 \boldsymbol\omega_{i}^{2}\right)^{-\frac{D+5}{2}}
"""
ls = pt.ones(self.n_dims) * self.ls
D52 = (self.n_dims + 5) / 2
num = (
pt.power(2, self.n_dims)
* pt.power(np.pi, self.n_dims / 2)
* pt.gamma(D52)
* pt.power(5, 5 / 2)
)
den = 0.75 * pt.sqrt(np.pi)
pow = pt.power(5.0 + pt.dot(pt.square(omega), pt.square(ls)), -1 * D52)
return (num / den) * pt.prod(ls) * pow
[docs]class Matern32(Stationary):
r"""
The Matern kernel with nu = 3/2.
.. math::
k(x, x') = \left(1 + \frac{\sqrt{3(x - x')^2}}{\ell}\right)
\mathrm{exp}\left[ - \frac{\sqrt{3(x - x')^2}}{\ell} \right]
"""
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
r = self.euclidean_dist(X, Xs)
return (1.0 + np.sqrt(3.0) * r) * pt.exp(-np.sqrt(3.0) * r)
[docs] def power_spectral_density(self, omega):
r"""
The power spectral density for the Matern32 kernel is:
.. math::
S(\boldsymbol\omega) =
\frac{2^D \pi^{D/2} \Gamma\left(\frac{D+3}{2}\right) 3^{3/2}}
{\frac{1}{2}\sqrt{\pi}}
\prod_{i=1}^{D}\ell_{i}
\left(3 + \sum_{i=1}^{D}\ell_{i}^2 \boldsymbol\omega_{i}^{2}\right)^{-\frac{D+3}{2}}
"""
ls = pt.ones(self.n_dims) * self.ls
D32 = (self.n_dims + 3) / 2
num = (
pt.power(2, self.n_dims)
* pt.power(np.pi, self.n_dims / 2)
* pt.gamma(D32)
* pt.power(3, 3 / 2)
)
den = 0.5 * pt.sqrt(np.pi)
pow = pt.power(3.0 + pt.dot(pt.square(omega), pt.square(ls)), -1 * D32)
return (num / den) * pt.prod(ls) * pow
class Matern12(Stationary):
r"""
The Matern kernel with nu = 1/2
.. math::
k(x, x') = \mathrm{exp}\left[ -\frac{(x - x')^2}{\ell} \right]
"""
def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
r = self.euclidean_dist(X, Xs)
return pt.exp(-r)
[docs]class Exponential(Stationary):
r"""
The Exponential kernel.
.. math::
k(x, x') = \mathrm{exp}\left[ -\frac{||x - x'||}{2\ell} \right]
"""
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
return pt.exp(-0.5 * self.euclidean_dist(X, Xs))
[docs]class Cosine(Stationary):
r"""
The Cosine kernel.
.. math::
k(x, x') = \mathrm{cos}\left( 2 \pi \frac{||x - x'||}{ \ell^2} \right)
"""
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
return pt.cos(2.0 * np.pi * self.euclidean_dist(X, Xs))
[docs]class Linear(Covariance):
r"""
The Linear kernel.
.. math::
k(x, x') = (x - c)(x' - c)
"""
[docs] def __init__(self, input_dim, c, active_dims=None):
super().__init__(input_dim, active_dims)
self.c = c
def _common(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
Xc = pt.sub(X, self.c)
return X, Xc, Xs
[docs] def full(self, X, Xs=None):
X, Xc, Xs = self._common(X, Xs)
if Xs is None:
return pt.dot(Xc, pt.transpose(Xc))
else:
Xsc = pt.sub(Xs, self.c)
return pt.dot(Xc, pt.transpose(Xsc))
[docs] def diag(self, X):
X, Xc, _ = self._common(X, None)
return pt.sum(pt.square(Xc), 1)
[docs]class Polynomial(Linear):
r"""
The Polynomial kernel.
.. math::
k(x, x') = [(x - c)(x' - c) + \mathrm{offset}]^{d}
"""
[docs] def __init__(self, input_dim, c, d, offset, active_dims=None):
super().__init__(input_dim, c, active_dims)
self.d = d
self.offset = offset
[docs] def full(self, X, Xs=None):
linear = super().full(X, Xs)
return pt.power(linear + self.offset, self.d)
[docs] def diag(self, X):
linear = super().diag(X)
return pt.power(linear + self.offset, self.d)
[docs]class Gibbs(Covariance):
r"""
The Gibbs kernel. Use an arbitrary lengthscale function defined
using PyTensor. Only tested in one dimension.
.. math::
k(x, x') = \sqrt{\frac{2\ell(x)\ell(x')}{\ell^2(x) + \ell^2(x')}}
\mathrm{exp}\left[ -\frac{(x - x')^2}
{\ell^2(x) + \ell^2(x')} \right]
Parameters
----------
lengthscale_func: callable
PyTensor function of X and additional optional arguments.
args: optional, tuple or list of scalars or PyMC variables
Additional inputs (besides X or Xs) to lengthscale_func.
"""
[docs] def __init__(self, input_dim, lengthscale_func, args=None, active_dims=None):
super().__init__(input_dim, active_dims)
if active_dims is not None:
if len(active_dims) > 1:
raise NotImplementedError(("Higher dimensional inputs ", "are untested"))
else:
if input_dim != 1:
raise NotImplementedError(("Higher dimensional inputs ", "are untested"))
if not callable(lengthscale_func):
raise TypeError("lengthscale_func must be callable")
self.lfunc = handle_args(lengthscale_func, args)
self.args = args
[docs] def square_dist(self, X, Xs=None):
X2 = pt.sum(pt.square(X), 1)
if Xs is None:
sqd = -2.0 * pt.dot(X, pt.transpose(X)) + (
pt.reshape(X2, (-1, 1)) + pt.reshape(X2, (1, -1))
)
else:
Xs2 = pt.sum(pt.square(Xs), 1)
sqd = -2.0 * pt.dot(X, pt.transpose(Xs)) + (
pt.reshape(X2, (-1, 1)) + pt.reshape(Xs2, (1, -1))
)
return pt.clip(sqd, 0.0, np.inf)
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
rx = self.lfunc(pt.as_tensor_variable(X), self.args)
if Xs is None:
rz = self.lfunc(pt.as_tensor_variable(X), self.args)
r2 = self.square_dist(X, X)
else:
rz = self.lfunc(pt.as_tensor_variable(Xs), self.args)
r2 = self.square_dist(X, Xs)
rx2 = pt.reshape(pt.square(rx), (-1, 1))
rz2 = pt.reshape(pt.square(rz), (1, -1))
return pt.sqrt((2.0 * pt.outer(rx, rz)) / (rx2 + rz2)) * pt.exp(-1.0 * r2 / (rx2 + rz2))
[docs] def diag(self, X):
return pt.alloc(1.0, X.shape[0])
[docs]class ScaledCov(Covariance):
r"""
Construct a kernel by multiplying a base kernel with a scaling
function defined using PyTensor. The scaling function is
non-negative, and can be parameterized.
.. math::
k(x, x') = \phi(x) k_{\text{base}}(x, x') \phi(x')
Parameters
----------
cov_func: Covariance
Base kernel or covariance function
scaling_func: callable
PyTensor function of X and additional optional arguments.
args: optional, tuple or list of scalars or PyMC variables
Additional inputs (besides X or Xs) to lengthscale_func.
"""
[docs] def __init__(self, input_dim, cov_func, scaling_func, args=None, active_dims=None):
super().__init__(input_dim, active_dims)
if not callable(scaling_func):
raise TypeError("scaling_func must be callable")
if not isinstance(cov_func, Covariance):
raise TypeError("Must be or inherit from the Covariance class")
self.cov_func = cov_func
self.scaling_func = handle_args(scaling_func, args)
self.args = args
[docs] def diag(self, X):
X, _ = self._slice(X, None)
cov_diag = self.cov_func(X, diag=True)
scf_diag = pt.square(pt.flatten(self.scaling_func(X, self.args)))
return cov_diag * scf_diag
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
scf_x = self.scaling_func(X, self.args)
if Xs is None:
return pt.outer(scf_x, scf_x) * self.cov_func(X)
else:
scf_xs = self.scaling_func(Xs, self.args)
return pt.outer(scf_x, scf_xs) * self.cov_func(X, Xs)
[docs]class Coregion(Covariance):
r"""Covariance function for intrinsic/linear coregionalization models.
Adapted from GPy http://gpy.readthedocs.io/en/deploy/GPy.kern.src.html#GPy.kern.src.coregionalize.Coregionalize.
This covariance has the form:
.. math::
\mathbf{B} = \mathbf{W}\mathbf{W}^\top + \text{diag}(\kappa)
and calls must use integers associated with the index of the matrix.
This allows the api to remain consistent with other covariance objects:
.. math::
k(x, x') = \mathbf{B}[x, x'^\top]
Parameters
----------
W: 2D array of shape (num_outputs, rank)
a low rank matrix that determines the correlations between
the different outputs (rows)
kappa: 1D array of shape (num_outputs, )
a vector which allows the outputs to behave independently
B: 2D array of shape (num_outputs, rank)
the total matrix, exactly one of (W, kappa) and B must be provided
Notes
-----
Exactly one dimension must be active for this kernel. Thus, if
`input_dim != 1`, then `active_dims` must have a length of one.
"""
[docs] def __init__(self, input_dim, W=None, kappa=None, B=None, active_dims=None):
super().__init__(input_dim, active_dims)
if len(self.active_dims) != 1:
raise ValueError("Coregion requires exactly one dimension to be active")
make_B = W is not None or kappa is not None
if make_B and B is not None:
raise ValueError("Exactly one of (W, kappa) and B must be provided to Coregion")
if make_B:
self.W = pt.as_tensor_variable(W)
self.kappa = pt.as_tensor_variable(kappa)
self.B = pt.dot(self.W, self.W.T) + pt.diag(self.kappa)
elif B is not None:
self.B = pt.as_tensor_variable(B)
else:
raise ValueError("Exactly one of (W, kappa) and B must be provided to Coregion")
[docs] def full(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
index = pt.cast(X, "int32")
if Xs is None:
index2 = index.T
else:
index2 = pt.cast(Xs, "int32").T
return self.B[index, index2]
[docs] def diag(self, X):
X, _ = self._slice(X, None)
index = pt.cast(X, "int32")
return pt.diag(self.B)[index.ravel()]
def handle_args(func, args):
def f(x, args):
if args is None:
return func(x)
else:
if not isinstance(args, tuple):
args = (args,)
return func(x, *args)
return f