# Source code for pymc.gp.cov

#   Copyright 2023 The PyMC Developers
#
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#
#   Unless required by applicable law or agreed to in writing, software
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and

import numbers
import warnings

from collections import Counter
from functools import reduce
from typing import Any, Callable, List, Optional, Sequence, Union

import numpy as np
import pytensor.tensor as pt

from pytensor.graph.basic import Variable
from pytensor.tensor.sharedvar import TensorSharedVariable
from pytensor.tensor.variable import TensorConstant, TensorVariable

__all__ = [
"Constant",
"WhiteNoise",
"Exponential",
"Matern52",
"Matern32",
"Matern12",
"Linear",
"Polynomial",
"Cosine",
"Periodic",
"WarpedInput",
"WrappedPeriodic",
"Gibbs",
"Coregion",
"ScaledCov",
"Kron",
]

TensorLike = Union[np.ndarray, TensorVariable]
IntSequence = Union[np.ndarray, Sequence[int]]

class BaseCovariance:
"""
Base class for kernels/covariance functions.
"""

def __call__(
self,
X: TensorLike,
Xs: Optional[TensorLike] = None,
diag: bool = False,
) -> TensorVariable:
r"""
Evaluate the kernel/covariance function.

Parameters
----------
X: The training inputs to the kernel.
Xs: The optional prediction set of inputs the kernel.
If Xs is None, Xs = X.
diag: bool
Return only the diagonal of the covariance function.
Default is False.
"""
if diag:
return self.diag(X)
else:
return self.full(X, Xs)

def diag(self, X: TensorLike) -> TensorVariable:
raise NotImplementedError

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
raise NotImplementedError

# If it's a scalar, cast as Constant covariance.  This allows validation for power spectral
# density calc.
if isinstance(other, numbers.Real):
other = Constant(c=other)

def __mul__(self, other) -> "Prod":
return Prod([self, other])

def __rmul__(self, other) -> "Prod":
return self.__mul__(other)

def __pow__(self, other) -> "Exponentiated":
other = pt.as_tensor_variable(other).squeeze()
if not other.ndim == 0:
raise ValueError("A covariance function can only be exponentiated by a scalar value")
if not isinstance(self, Covariance):
raise TypeError(
"Can only exponentiate covariance functions which inherit from Covariance"
)
return Exponentiated(self, other)

def __array_wrap__(self, result):
"""
Required to allow radd/rmul by numpy arrays.
"""
result = np.squeeze(result)
if len(result.shape) <= 1:
result = result.reshape(1, 1)
elif len(result.shape) > 2:
raise ValueError(
f"cannot combine a covariance function with array of shape {result.shape}"
)
r, c = result.shape
A = np.zeros((r, c))
for i in range(r):
for j in range(c):
r = result[i, j]._factor_list
if isinstance(r, Constant):
# Counteract the elemwise Add edgecase
r = r.c
A[i, j] = r
return result._factor_list + A
elif isinstance(result, Prod):
return result._factor_list * A
else:
raise TypeError(
f"Unknown Covariance combination type {result}.  "
"Known types are Add or Prod."
)

@staticmethod
def _alloc(X, *shape: int) -> TensorVariable:
return pt.alloc(X, *shape)  # type: ignore

class Covariance(BaseCovariance):
"""
Base class for kernels/covariance functions with input_dim and active_dims, which excludes
kernels like Constant and WhiteNoise.

Parameters
----------
input_dim: integer
The number of input dimensions, or columns of X (or Xs)
the kernel will operate on.
active_dims: List of integers
Indicate which dimension or column of X the covariance
function operates on.
"""

def __init__(self, input_dim: int, active_dims: Optional[IntSequence] = None):
self.input_dim = input_dim
if active_dims is None:
self.active_dims = np.arange(input_dim)
else:
self.active_dims = np.asarray(active_dims, int)

if max(self.active_dims) > self.input_dim:
raise ValueError("Values in active_dims can't be larger than input_dim.")

@property
def n_dims(self) -> int:
"""The dimensionality of the input, as taken from the
active_dims.
"""
# Evaluate lazily in-case this changes.
return len(self.active_dims)

def _slice(self, X, Xs=None):
xdims = X.shape[-1]
if isinstance(xdims, Variable):
# Circular dependency
from pymc.pytensorf import constant_fold

[xdims] = constant_fold([xdims])
if self.input_dim != xdims:
warnings.warn(
f"Only {self.input_dim} column(s) out of {xdims} are"
" being used to compute the covariance function. If this"
" is not intended, increase 'input_dim' parameter to"
" the number of columns to use. Ignore otherwise.",
UserWarning,
)
X = pt.as_tensor_variable(X[:, self.active_dims])
if Xs is not None:
Xs = pt.as_tensor_variable(Xs[:, self.active_dims])
return X, Xs

class Combination(Covariance):
def __init__(self, factor_list: Sequence):
"""Use constituent factors to get input_dim and active_dims for the Combination covariance."""

# Check if all input_dim are the same in factor_list
input_dims = {factor.input_dim for factor in factor_list if isinstance(factor, Covariance)}

if len(input_dims) != 1:
raise ValueError("All covariances must have the same input_dim.")
input_dim = input_dims.pop()

# Union all active_dims sets in factor_list for the combination covariance
active_dims = np.sort(
np.asarray(
list(
set.union(
*[
set(factor.active_dims)
for factor in factor_list
if isinstance(factor, Covariance)
]
)
),
dtype=int,
)
)
super().__init__(input_dim=input_dim, active_dims=active_dims)

# Set up combination kernel, flatten out factor_list so that
self._factor_list: List[Any] = []
for factor in factor_list:
if isinstance(factor, self.__class__):
self._factor_list.extend(factor._factor_list)
else:
self._factor_list.append(factor)

def _merge_factors_cov(self, X, Xs=None, diag=False):
"""Called to evaluate either all the sums or all the
products of kernels that are possible to evaluate.
"""
factor_list = []
for factor in self._factor_list:
# make sure diag=True is handled properly
if isinstance(factor, BaseCovariance):
factor_list.append(factor(X, Xs, diag))

elif isinstance(factor, np.ndarray):
if np.ndim(factor) == 2 and diag:
factor_list.append(np.diag(factor))
else:
factor_list.append(factor)

elif isinstance(
factor,
(
TensorConstant,
TensorVariable,
TensorSharedVariable,
),
):
if factor.ndim == 2 and diag:
factor_list.append(pt.diag(factor))
else:
factor_list.append(factor)

else:
factor_list.append(factor)

return factor_list

def _merge_factors_psd(self, omega):
"""Called to evaluatate spectral densities of combination kernels when possible.

Implements
a more restricted set of rules than _merge_factors_cov -- just additivity of stationary
covariances with defined power spectral densities and multiplication by scalars.  Also, the
active_dims for all covariances in the sum must be the same.
"""
factor_list = []
for factor in self._factor_list:
if isinstance(factor, Covariance):
# Allow merging covariances for psd only if active_dims are the same
if set(self.active_dims) != set(factor.active_dims):
raise ValueError(
"For power spectral density calculations active_dims must be the same "
"for all covariances in the sum."
)

# If it's a covariance try to calculate the psd
try:
factor_list.append(factor.power_spectral_density(omega))

except (AttributeError, NotImplementedError) as e:
if isinstance(factor, Stationary):
raise NotImplementedError(
f"No power spectral density method has been implemented for {factor}."
) from e

else:
raise ValueError(
"Power spectral densities, .power_spectral_density(omega), can only "
f"be calculated for Stationary covariance functions.  {factor} is "
"non-stationary."
) from e

else:
# Otherwise defer the reduction to later
factor_list.append(factor)

return factor_list

def __call__(
self,
X: TensorLike,
Xs: Optional[TensorLike] = None,
diag: bool = False,
) -> TensorVariable:

def power_spectral_density(self, omega: TensorLike) -> TensorVariable:

class Prod(Combination):
def __call__(
self,
X: TensorLike,
Xs: Optional[TensorLike] = None,
diag: bool = False,
) -> TensorVariable:
return reduce(mul, self._merge_factors_cov(X, Xs, diag))

def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
check = Counter([isinstance(factor, Covariance) for factor in self._factor_list])
if check.get(True, 0) >= 2:
raise NotImplementedError(
"The power spectral density of products of covariance "
"functions is not implemented."
)
return reduce(mul, self._merge_factors_psd(omega))

class Exponentiated(Covariance):
def __init__(self, kernel: Covariance, power):
self.kernel = kernel
self.power = power
super().__init__(input_dim=self.kernel.input_dim, active_dims=self.kernel.active_dims)

def __call__(
self, X: TensorLike, Xs: Optional[TensorLike] = None, diag: bool = False
) -> TensorVariable:
return self.kernel(X, Xs, diag=diag) ** self.power

[docs]class Kron(Covariance):
r"""Form a covariance object that is the kronecker product of other covariances.

In contrast to standard multiplication, where each covariance is given the
same inputs X and Xs, kronecker product covariances first split the inputs
into their respective spaces (inferred from the input_dim of each object)
before forming their product. Kronecker covariances have a larger
input dimension than any of its factors since the inputs are the
concatenated columns of its components.

Factors must be covariances or their combinations, arrays will not work.

Generally utilized by the gp.MarginalKron and gp.LatentKron
implementations.
"""

[docs]    def __init__(self, factor_list: Sequence[Covariance]):
self.input_dims = [factor.input_dim for factor in factor_list]
input_dim = sum(self.input_dims)
super().__init__(input_dim=input_dim)
self._factor_list = factor_list

def _split(self, X, Xs):
indices = np.cumsum(self.input_dims)
X_split = np.hsplit(X, indices)
if Xs is not None:
Xs_split = np.hsplit(Xs, indices)
else:
Xs_split = [None] * len(X_split)
return X_split, Xs_split

def __call__(
self, X: TensorLike, Xs: Optional[TensorLike] = None, diag: bool = False
) -> TensorVariable:
X_split, Xs_split = self._split(X, Xs)
covs = [cov(x, xs, diag) for cov, x, xs in zip(self._factor_list, X_split, Xs_split)]
return reduce(mul, covs)

[docs]class Constant(BaseCovariance):
r"""
Constant valued covariance function.

.. math::

k(x, x') = c
"""

[docs]    def __init__(self, c):
self.c = c

[docs]    def diag(self, X: TensorLike) -> TensorVariable:
return self._alloc(self.c, X.shape)

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
if Xs is None:
return self._alloc(self.c, X.shape, X.shape)
else:
return self._alloc(self.c, X.shape, Xs.shape)

[docs]class WhiteNoise(BaseCovariance):
r"""
White noise covariance function.

.. math::

k(x, x') = \sigma^2 \mathrm{I}
"""

[docs]    def __init__(self, sigma):
self.sigma = sigma

[docs]    def diag(self, X: TensorLike) -> TensorVariable:
return self._alloc(pt.square(self.sigma), X.shape)

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
if Xs is None:
return pt.diag(self.diag(X))
else:
return self._alloc(0.0, X.shape, Xs.shape)

class Circular(Covariance):
R"""
Circular Kernel.

.. math::

k_g(x, y) = W_\pi(\operatorname{dist}_{\mathit{geo}}(x, y)),

with

.. math::

W_c(t) = \left(1 + \tau \frac{t}{c}\right)\left(1-\frac{t}{c}\right)^\tau_+

where :math:c is maximum value for :math:t and :math:\tau\ge 4.
:math:\tau controls for correlation strength, larger :math:\tau leads to less smooth functions
See _ for more explanations and use cases.

Parameters
----------
period : scalar
defines the circular interval :math:[0, \mathit{bound})
tau : scalar
:math:\tau\ge 4 defines correlation strength, the larger,
the smaller correlation is. Minimum value is :math:4

References
----------
..  Espéran Padonou, O Roustant, "Polar Gaussian Processes for Predicting on Circular Domains"
https://hal.archives-ouvertes.fr/hal-01119942v1/document
"""

def __init__(
self,
input_dim: int,
period,
tau=4,
active_dims: Optional[IntSequence] = None,
):
super().__init__(input_dim, active_dims)
self.c = pt.as_tensor_variable(period / 2)
self.tau = tau

def dist(self, X, Xs):
if Xs is None:
Xs = pt.transpose(X)
else:
Xs = pt.transpose(Xs)
return pt.abs((X - Xs + self.c) % (self.c * 2) - self.c)

def weinland(self, t):
return (1 + self.tau * t / self.c) * pt.clip(1 - t / self.c, 0, np.inf) ** self.tau

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
return self.weinland(self.dist(X, Xs))

def diag(self, X: TensorLike) -> TensorVariable:
return self._alloc(1.0, X.shape)

class Stationary(Covariance):
r"""
Base class for stationary kernels/covariance functions.

Parameters
----------
ls: Lengthscale.  If input_dim > 1, a list or array of scalars or PyMC random
variables.  If input_dim == 1, a scalar or PyMC random variable.
ls_inv: Inverse lengthscale.  1 / ls.  One of ls or ls_inv must be provided.
"""

def __init__(
self,
input_dim: int,
ls=None,
ls_inv=None,
active_dims: Optional[IntSequence] = None,
):
super().__init__(input_dim, active_dims)
if (ls is None and ls_inv is None) or (ls is not None and ls_inv is not None):
raise ValueError("Only one of 'ls' or 'ls_inv' must be provided")
elif ls_inv is not None:
if isinstance(ls_inv, (list, tuple)):
ls = 1.0 / np.asarray(ls_inv)
else:
ls = 1.0 / ls_inv
self.ls = pt.as_tensor_variable(ls)

def square_dist(self, X, Xs):
X = pt.mul(X, 1.0 / self.ls)
X2 = pt.sum(pt.square(X), 1)
if Xs is None:
sqd = -2.0 * pt.dot(X, pt.transpose(X)) + (
pt.reshape(X2, (-1, 1)) + pt.reshape(X2, (1, -1))
)
else:
Xs = pt.mul(Xs, 1.0 / self.ls)
Xs2 = pt.sum(pt.square(Xs), 1)
sqd = -2.0 * pt.dot(X, pt.transpose(Xs)) + (
pt.reshape(X2, (-1, 1)) + pt.reshape(Xs2, (1, -1))
)
return pt.clip(sqd, 0.0, np.inf)

def euclidean_dist(self, X, Xs):
r2 = self.square_dist(X, Xs)
return self._sqrt(r2)

def _sqrt(self, r2):
return pt.sqrt(r2 + 1e-12)

def diag(self, X: TensorLike) -> TensorVariable:
return self._alloc(1.0, X.shape)

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r2 = self.square_dist(X, Xs)
return self.full_from_distance(r2, squared=True)

def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
raise NotImplementedError

def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
raise NotImplementedError

r"""
The Exponentiated Quadratic kernel.  Also referred to as the Squared
Exponential, or Radial Basis Function kernel.

.. math::

k(x, x') = \mathrm{exp}\left[ -\frac{(x - x')^2}{2 \ell^2} \right]

"""

[docs]    def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r2 = dist if squared else dist**2
return pt.exp(-0.5 * r2)

[docs]    def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
r"""
The power spectral density for the ExpQuad kernel is:

.. math::

S(\boldsymbol\omega) =
(\sqrt(2 \pi)^D \prod_{i}^{D}\ell_i
\exp\left( -\frac{1}{2} \sum_{i}^{D}\ell_i^2 \omega_i^{2} \right)
"""
ls = pt.ones(self.n_dims) * self.ls
c = pt.power(pt.sqrt(2.0 * np.pi), self.n_dims)
exp = pt.exp(-0.5 * pt.dot(pt.square(omega), pt.square(ls)))
return c * pt.prod(ls) * exp

r"""

.. math::

k(x, x') = \left(1 + \frac{(x - x')^2}{2\alpha\ell^2} \right)^{-\alpha}
"""

[docs]    def __init__(
self,
input_dim: int,
alpha,
ls=None,
ls_inv=None,
active_dims: Optional[IntSequence] = None,
):
super().__init__(input_dim, ls, ls_inv, active_dims)
self.alpha = alpha

[docs]    def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r2 = dist if squared else dist**2
return pt.power(
(1.0 + 0.5 * r2 * (1.0 / self.alpha)),
-1.0 * self.alpha,
)

[docs]class Matern52(Stationary):
r"""
The Matern kernel with nu = 5/2.

.. math::

k(x, x') = \left(1 + \frac{\sqrt{5(x - x')^2}}{\ell} +
\frac{5(x-x')^2}{3\ell^2}\right)
\mathrm{exp}\left[ - \frac{\sqrt{5(x - x')^2}}{\ell} \right]
"""

[docs]    def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return (1.0 + np.sqrt(5.0) * r + 5.0 / 3.0 * pt.square(r)) * pt.exp(-1.0 * np.sqrt(5.0) * r)

[docs]    def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
r"""
The power spectral density for the Matern52 kernel is:

.. math::

S(\boldsymbol\omega) =
\frac{2^D \pi^{\frac{D}{2}} \Gamma(\frac{D+5}{2}) 5^{5/2}}
{\frac{3}{4}\sqrt{\pi}}
\prod_{i=1}^{D}\ell_{i}
\left(5 + \sum_{i=1}^{D}\ell_{i}^2 \boldsymbol\omega_{i}^{2}\right)^{-\frac{D+5}{2}}
"""
ls = pt.ones(self.n_dims) * self.ls
D52 = (self.n_dims + 5) / 2
num = (
pt.power(2, self.n_dims)
* pt.power(np.pi, self.n_dims / 2)
* pt.gamma(D52)
* pt.power(5, 5 / 2)
)
den = 0.75 * pt.sqrt(np.pi)
pow = pt.power(5.0 + pt.dot(pt.square(omega), pt.square(ls)), -1 * D52)
return (num / den) * pt.prod(ls) * pow

[docs]class Matern32(Stationary):
r"""
The Matern kernel with nu = 3/2.

.. math::

k(x, x') = \left(1 + \frac{\sqrt{3(x - x')^2}}{\ell}\right)
\mathrm{exp}\left[ - \frac{\sqrt{3(x - x')^2}}{\ell} \right]
"""

[docs]    def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return (1.0 + np.sqrt(3.0) * r) * pt.exp(-np.sqrt(3.0) * r)

[docs]    def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
r"""
The power spectral density for the Matern32 kernel is:

.. math::

S(\boldsymbol\omega) =
\frac{2^D \pi^{D/2} \Gamma\left(\frac{D+3}{2}\right) 3^{3/2}}
{\frac{1}{2}\sqrt{\pi}}
\prod_{i=1}^{D}\ell_{i}
\left(3 + \sum_{i=1}^{D}\ell_{i}^2 \boldsymbol\omega_{i}^{2}\right)^{-\frac{D+3}{2}}
"""
ls = pt.ones(self.n_dims) * self.ls
D32 = (self.n_dims + 3) / 2
num = (
pt.power(2, self.n_dims)
* pt.power(np.pi, self.n_dims / 2)
* pt.gamma(D32)
* pt.power(3, 3 / 2)
)
den = 0.5 * pt.sqrt(np.pi)
pow = pt.power(3.0 + pt.dot(pt.square(omega), pt.square(ls)), -1 * D32)
return (num / den) * pt.prod(ls) * pow

class Matern12(Stationary):
r"""
The Matern kernel with nu = 1/2

.. math::

k(x, x') = \mathrm{exp}\left[ -\frac{(x - x')^2}{\ell} \right]
"""

def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return pt.exp(-r)

[docs]class Exponential(Stationary):
r"""
The Exponential kernel.

.. math::

k(x, x') = \mathrm{exp}\left[ -\frac{||x - x'||}{2\ell} \right]
"""

[docs]    def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return pt.exp(-0.5 * r)

[docs]class Cosine(Stationary):
r"""
The Cosine kernel.

.. math::
k(x, x') = \mathrm{cos}\left( 2 \pi \frac{||x - x'||}{ \ell^2} \right)
"""

[docs]    def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return pt.cos(2.0 * np.pi * r)

[docs]class Periodic(Stationary):
r"""
The Periodic kernel.

.. math::
k(x, x') = \mathrm{exp}\left( -\frac{\mathrm{sin}^2(\pi |x-x'| \frac{1}{T})}{2\ell^2} \right)

Notes
-----
Note that the scaling factor for this kernel is different compared to the more common
definition (see _). Here, 0.5 is in the exponent instead of the more common value, 2.
Divide the length-scale by 2 when initializing the kernel to recover the standard definition.

References
----------
..  David Duvenaud, "The Kernel Cookbook"
https://www.cs.toronto.edu/~duvenaud/cookbook/
"""

[docs]    def __init__(
self,
input_dim: int,
period,
ls=None,
ls_inv=None,
active_dims: Optional[IntSequence] = None,
):
super().__init__(input_dim, ls, ls_inv, active_dims)
self.period = period

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
if Xs is None:
Xs = X
f1 = pt.expand_dims(X, axis=(0,))
f2 = pt.expand_dims(Xs, axis=(1,))
r = np.pi * (f1 - f2) / self.period
r2 = pt.sum(pt.square(pt.sin(r) / self.ls), 2)
return self.full_from_distance(r2, squared=True)

[docs]    def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
# NOTE: This is the same as the ExpQuad as we assume the periodicity
# has already been accounted for in the distance
r2 = dist if squared else dist**2
return pt.exp(-0.5 * r2)

[docs]class Linear(Covariance):
r"""
The Linear kernel.

.. math::
k(x, x') = (x - c)(x' - c)
"""

[docs]    def __init__(self, input_dim: int, c, active_dims: Optional[IntSequence] = None):
super().__init__(input_dim, active_dims)
self.c = c

def _common(self, X, Xs=None):
X, Xs = self._slice(X, Xs)
Xc = pt.sub(X, self.c)
return X, Xc, Xs

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xc, Xs = self._common(X, Xs)
if Xs is None:
return pt.dot(Xc, pt.transpose(Xc))
else:
Xsc = pt.sub(Xs, self.c)
return pt.dot(Xc, pt.transpose(Xsc))

[docs]    def diag(self, X: TensorLike) -> TensorVariable:
X, Xc, _ = self._common(X, None)
return pt.sum(pt.square(Xc), 1)

[docs]class Polynomial(Linear):
r"""
The Polynomial kernel.

.. math::
k(x, x') = [(x - c)(x' - c) + \mathrm{offset}]^{d}
"""

[docs]    def __init__(self, input_dim: int, c, d, offset, active_dims: Optional[IntSequence] = None):
super().__init__(input_dim, c, active_dims)
self.d = d
self.offset = offset

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
linear = super().full(X, Xs)
return pt.power(linear + self.offset, self.d)

[docs]    def diag(self, X: TensorLike) -> TensorVariable:
linear = super().diag(X)
return pt.power(linear + self.offset, self.d)

[docs]class WarpedInput(Covariance):
r"""
Warp the inputs of any kernel using an arbitrary function
defined using PyTensor.

.. math::
k(x, x') = k(w(x), w(x'))

Parameters
----------
cov_func: Covariance
warp_func: callable
PyTensor function of X and additional optional arguments.
args: optional, tuple or list of scalars or PyMC variables
Additional inputs (besides X or Xs) to warp_func.
"""

[docs]    def __init__(
self,
input_dim: int,
cov_func: Covariance,
warp_func: Callable,
args=None,
active_dims: Optional[IntSequence] = None,
):
super().__init__(input_dim, active_dims)
if not callable(warp_func):
raise TypeError("warp_func must be callable")
if not isinstance(cov_func, Covariance):
raise TypeError("Must be or inherit from the Covariance class")
self.w = handle_args(warp_func)
self.args = args
self.cov_func = cov_func

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
if Xs is None:
return self.cov_func(self.w(X, self.args), Xs)
else:
return self.cov_func(self.w(X, self.args), self.w(Xs, self.args))

[docs]    def diag(self, X: TensorLike) -> TensorVariable:
X, _ = self._slice(X, None)
return self.cov_func(self.w(X, self.args), diag=True)

class WrappedPeriodic(Covariance):
r"""
The WrappedPeriodic kernel constructs periodic kernels from any Stationary kernel.

This is done by warping the input with the function

.. math::
\mathbf{u}(x) = \left(
\mathrm{sin} \left( \frac{2\pi x}{T} \right),
\mathrm{cos} \left( \frac{2\pi x}{T} \right)
\right)

The original derivation as applied to the squared exponential kernel can
be found in  or referenced in Chapter 4, page 92 of .

Notes
-----
Note that we drop the resulting scaling by 4 found in the original derivation
so that the interpretation of the length scales is consistent between the
underlying kernel and the periodic version.

Examples
--------
In order to construct a kernel equivalent to the Periodic kernel you
can do the following (though using Periodic will likely be a bit faster):

.. code:: python

References
----------
..  MacKay, D. J. C. (1998). Introduction to Gaussian Processes.
In Bishop, C. M., editor, Neural Networks and Machine Learning. Springer-Verlag
..  Rasmussen, C. E., & Williams, C. K. I. (2006). Gaussian processes for machine learning. MIT Press.
http://gaussianprocess.org/gpml/chapters/

Parameters
----------
cov_func: Stationary
Base kernel or covariance function
period: Period
"""

def __init__(self, cov_func: Stationary, period):
super().__init__(cov_func.input_dim, cov_func.active_dims)
if not isinstance(cov_func, Stationary):
raise TypeError("Must inherit from the Stationary class")
self.cov_func = cov_func
self.period = period

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
if Xs is None:
Xs = X
f1 = pt.expand_dims(X, axis=(0,))
f2 = pt.expand_dims(Xs, axis=(1,))
r = np.pi * (f1 - f2) / self.period
r2 = pt.sum(pt.square(pt.sin(r) / self.cov_func.ls), 2)
return self.cov_func.full_from_distance(r2, squared=True)

def diag(self, X: TensorLike) -> TensorVariable:
return self._alloc(1.0, X.shape)

[docs]class Gibbs(Covariance):
r"""
The Gibbs kernel.  Use an arbitrary lengthscale function defined
using PyTensor.  Only tested in one dimension.

.. math::
k(x, x') = \sqrt{\frac{2\ell(x)\ell(x')}{\ell^2(x) + \ell^2(x')}}
\mathrm{exp}\left[ -\frac{(x - x')^2}
{\ell^2(x) + \ell^2(x')} \right]

Parameters
----------
lengthscale_func: callable
PyTensor function of X and additional optional arguments.
args: optional, tuple or list of scalars or PyMC variables
Additional inputs (besides X or Xs) to lengthscale_func.
"""

[docs]    def __init__(
self,
input_dim: int,
lengthscale_func: Callable,
args=None,
active_dims: Optional[IntSequence] = None,
):
super().__init__(input_dim, active_dims)
if active_dims is not None:
if len(active_dims) > 1:
raise NotImplementedError(("Higher dimensional inputs ", "are untested"))
else:
if input_dim != 1:
raise NotImplementedError(("Higher dimensional inputs ", "are untested"))
if not callable(lengthscale_func):
raise TypeError("lengthscale_func must be callable")
self.lfunc = handle_args(lengthscale_func)
self.args = args

[docs]    def square_dist(self, X, Xs=None):
X2 = pt.sum(pt.square(X), 1)
if Xs is None:
sqd = -2.0 * pt.dot(X, pt.transpose(X)) + (
pt.reshape(X2, (-1, 1)) + pt.reshape(X2, (1, -1))
)
else:
Xs2 = pt.sum(pt.square(Xs), 1)
sqd = -2.0 * pt.dot(X, pt.transpose(Xs)) + (
pt.reshape(X2, (-1, 1)) + pt.reshape(Xs2, (1, -1))
)
return pt.clip(sqd, 0.0, np.inf)

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
rx = self.lfunc(pt.as_tensor_variable(X), self.args)
if Xs is None:
rz = self.lfunc(pt.as_tensor_variable(X), self.args)
r2 = self.square_dist(X, X)
else:
rz = self.lfunc(pt.as_tensor_variable(Xs), self.args)
r2 = self.square_dist(X, Xs)
rx2 = pt.reshape(pt.square(rx), (-1, 1))
rz2 = pt.reshape(pt.square(rz), (1, -1))
return pt.sqrt((2.0 * pt.outer(rx, rz)) / (rx2 + rz2)) * pt.exp(-1.0 * r2 / (rx2 + rz2))

[docs]    def diag(self, X: TensorLike) -> TensorVariable:
return self._alloc(1.0, X.shape)

[docs]class ScaledCov(Covariance):
r"""
Construct a kernel by multiplying a base kernel with a scaling
function defined using PyTensor.  The scaling function is
non-negative, and can be parameterized.

.. math::
k(x, x') = \phi(x) k_{\text{base}}(x, x') \phi(x')

Parameters
----------
cov_func: Covariance
Base kernel or covariance function
scaling_func: callable
PyTensor function of X and additional optional arguments.
args: optional, tuple or list of scalars or PyMC variables
Additional inputs (besides X or Xs) to lengthscale_func.
"""

[docs]    def __init__(
self,
input_dim: int,
cov_func: Covariance,
scaling_func: Callable,
args=None,
active_dims: Optional[IntSequence] = None,
):
super().__init__(input_dim, active_dims)
if not callable(scaling_func):
raise TypeError("scaling_func must be callable")
if not isinstance(cov_func, Covariance):
raise TypeError("Must be or inherit from the Covariance class")
self.cov_func = cov_func
self.scaling_func = handle_args(scaling_func)
self.args = args

[docs]    def diag(self, X: TensorLike) -> TensorVariable:
X, _ = self._slice(X, None)
cov_diag = self.cov_func(X, diag=True)
scf_diag = pt.square(pt.flatten(self.scaling_func(X, self.args)))
return cov_diag * scf_diag

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
scf_x = self.scaling_func(X, self.args)
if Xs is None:
return pt.outer(scf_x, scf_x) * self.cov_func(X)
else:
scf_xs = self.scaling_func(Xs, self.args)
return pt.outer(scf_x, scf_xs) * self.cov_func(X, Xs)

[docs]class Coregion(Covariance):
r"""Covariance function for intrinsic/linear coregionalization models.

This covariance has the form:

.. math::

\mathbf{B} = \mathbf{W}\mathbf{W}^\top + \text{diag}(\kappa)

and calls must use integers associated with the index of the matrix.
This allows the api to remain consistent with other covariance objects:

.. math::

k(x, x') = \mathbf{B}[x, x'^\top]

Parameters
----------
W: 2D array of shape (num_outputs, rank)
a low rank matrix that determines the correlations between
the different outputs (rows)
kappa: 1D array of shape (num_outputs, )
a vector which allows the outputs to behave independently
B: 2D array of shape (num_outputs, rank)
the total matrix, exactly one of (W, kappa) and B must be provided

Notes
-----
Exactly one dimension must be active for this kernel. Thus, if
input_dim != 1, then active_dims must have a length of one.
"""

[docs]    def __init__(
self,
input_dim: int,
W=None,
kappa=None,
B=None,
active_dims: Optional[IntSequence] = None,
):
super().__init__(input_dim, active_dims)
if len(self.active_dims) != 1:
raise ValueError("Coregion requires exactly one dimension to be active")
make_B = W is not None or kappa is not None
if make_B and B is not None:
raise ValueError("Exactly one of (W, kappa) and B must be provided to Coregion")
if make_B:
self.W = pt.as_tensor_variable(W)
self.kappa = pt.as_tensor_variable(kappa)
self.B = pt.dot(self.W, self.W.T) + pt.diag(self.kappa)
elif B is not None:
self.B = pt.as_tensor_variable(B)
else:
raise ValueError("Exactly one of (W, kappa) and B must be provided to Coregion")

[docs]    def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
index = pt.cast(X, "int32")
if Xs is None:
index2 = index.T
else:
index2 = pt.cast(Xs, "int32").T
return self.B[index, index2]

[docs]    def diag(self, X: TensorLike) -> TensorVariable:
X, _ = self._slice(X, None)
index = pt.cast(X, "int32")
return pt.diag(self.B)[index.ravel()]

def handle_args(func: Callable) -> Callable:
def f(x, args):
if args is None:
return func(x)
else:
if not isinstance(args, tuple):
args = (args,)
return func(x, *args)

return f
`