# Copyright 2024 The PyMC Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import warnings
from functools import partial, reduce
import numpy as np
import pytensor
import pytensor.tensor as pt
import scipy
from pytensor.graph import node_rewriter
from pytensor.graph.basic import Apply, Variable
from pytensor.graph.op import Op
from pytensor.raise_op import Assert
from pytensor.sparse.basic import DenseFromSparse, sp_sum
from pytensor.tensor import (
TensorConstant,
TensorVariable,
gammaln,
get_underlying_scalar_constant_value,
sigmoid,
)
from pytensor.tensor.elemwise import DimShuffle
from pytensor.tensor.exceptions import NotScalarConstantError
from pytensor.tensor.linalg import cholesky, det, eigh, solve_triangular, trace
from pytensor.tensor.linalg import inv as matrix_inverse
from pytensor.tensor.random.basic import MvNormalRV, dirichlet, multinomial, multivariate_normal
from pytensor.tensor.random.op import RandomVariable
from pytensor.tensor.random.utils import (
broadcast_params,
normalize_size_param,
)
from pytensor.tensor.type import TensorType
from scipy import stats
import pymc as pm
from pymc.distributions import transforms
from pymc.distributions.continuous import BoundedContinuous, ChiSquared, Normal
from pymc.distributions.dist_math import (
betaln,
check_parameters,
factln,
logpow,
multigammaln,
)
from pymc.distributions.distribution import (
Continuous,
Discrete,
Distribution,
SymbolicRandomVariable,
_support_point,
support_point,
)
from pymc.distributions.shape_utils import (
_change_dist_size,
change_dist_size,
get_support_shape,
implicit_size_from_params,
rv_size_is_none,
to_tuple,
)
from pymc.distributions.transforms import Interval, ZeroSumTransform, _default_transform
from pymc.logprob.abstract import _logprob
from pymc.logprob.rewriting import (
specialization_ir_rewrites_db,
)
from pymc.math import kron_diag, kron_dot
from pymc.pytensorf import normalize_rng_param
from pymc.util import check_dist_not_registered
__all__ = [
"MvNormal",
"ZeroSumNormal",
"MvStudentT",
"Dirichlet",
"Multinomial",
"DirichletMultinomial",
"OrderedMultinomial",
"Wishart",
"WishartBartlett",
"LKJCorr",
"LKJCholeskyCov",
"MatrixNormal",
"KroneckerNormal",
"CAR",
"ICAR",
"StickBreakingWeights",
]
solve_lower = partial(solve_triangular, lower=True)
solve_upper = partial(solve_triangular, lower=False)
def _squeeze_to_ndim(var: TensorVariable | np.ndarray, ndim: int):
squeeze = pt.squeeze if isinstance(var, TensorVariable) else np.squeeze
extra_dims = var.ndim - ndim
if extra_dims:
return squeeze(var, axis=tuple(range(extra_dims)))
else:
return var
class SimplexContinuous(Continuous):
"""Base class for simplex continuous distributions"""
@_default_transform.register(SimplexContinuous)
def simplex_cont_transform(op, rv):
return transforms.simplex
# Step methods and advi do not catch LinAlgErrors at the
# moment. We work around that by using a cholesky op
# that returns a nan as first entry instead of raising
# an error.
nan_lower_cholesky = partial(cholesky, lower=True, on_error="nan")
def quaddist_matrix(cov=None, chol=None, tau=None, lower=True, *args, **kwargs):
if len([i for i in [tau, cov, chol] if i is not None]) != 1:
raise ValueError("Incompatible parameterization. Specify exactly one of tau, cov, or chol.")
if cov is not None:
cov = pt.as_tensor_variable(cov)
if cov.ndim < 2:
raise ValueError("cov must be at least two dimensional.")
elif tau is not None:
tau = pt.as_tensor_variable(tau)
if tau.ndim < 2:
raise ValueError("tau must be at least two dimensional.")
cov = matrix_inverse(tau)
else:
chol = pt.as_tensor_variable(chol)
if chol.ndim < 2:
raise ValueError("chol must be at least two dimensional.")
if not lower:
chol = pt.swapaxes(chol, -1, -2)
# tag as lower triangular to enable pytensor rewrites of chol(l.l') -> l
chol.tag.lower_triangular = True
cov = pt.matmul(chol, pt.swapaxes(chol, -1, -2))
return cov
def _logdet_from_cholesky(chol: TensorVariable) -> tuple[TensorVariable, TensorVariable]:
diag = pt.diagonal(chol, axis1=-2, axis2=-1)
logdet = pt.log(diag).sum(axis=-1)
posdef = pt.all(diag > 0, axis=-1)
return logdet, posdef
def quaddist_chol(value, mu, cov):
"""Compute (x - mu).T @ Sigma^-1 @ (x - mu) and the logdet of Sigma."""
if value.ndim == 0:
raise ValueError("Value can't be a scalar")
if value.ndim == 1:
onedim = True
value = value[None, :]
else:
onedim = False
chol_cov = nan_lower_cholesky(cov)
logdet, posdef = _logdet_from_cholesky(chol_cov)
# solve_triangular will raise if there are nans
# (which happens if the cholesky fails)
chol_cov = pt.switch(posdef[..., None, None], chol_cov, 1)
delta = value - mu
delta_trans = solve_lower(chol_cov, delta, b_ndim=1)
quaddist = (delta_trans**2).sum(axis=-1)
if onedim:
return quaddist[0], logdet, posdef
else:
return quaddist, logdet, posdef
[docs]
class MvNormal(Continuous):
r"""
Multivariate normal log-likelihood.
.. math::
f(x \mid \pi, T) =
\frac{|T|^{1/2}}{(2\pi)^{k/2}}
\exp\left\{ -\frac{1}{2} (x-\mu)^{\prime} T (x-\mu) \right\}
======== ==========================
Support :math:`x \in \mathbb{R}^k`
Mean :math:`\mu`
Variance :math:`T^{-1}`
======== ==========================
Parameters
----------
mu : tensor_like of float
Vector of means.
cov : tensor_like of float, optional
Covariance matrix. Exactly one of cov, tau, or chol is needed.
tau : tensor_like of float, optional
Precision matrix. Exactly one of cov, tau, or chol is needed.
chol : tensor_like of float, optional
Cholesky decomposition of covariance matrix. Exactly one of cov,
tau, or chol is needed.
lower: bool, default=True
Whether chol is the lower tridiagonal cholesky factor.
Examples
--------
Define a multivariate normal variable for a given covariance
matrix::
cov = np.array([[1., 0.5], [0.5, 2]])
mu = np.zeros(2)
vals = pm.MvNormal('vals', mu=mu, cov=cov, shape=(5, 2))
Most of the time it is preferable to specify the cholesky
factor of the covariance instead. For example, we could
fit a multivariate outcome like this (see the docstring
of `LKJCholeskyCov` for more information about this)::
mu = np.zeros(3)
true_cov = np.array([[1.0, 0.5, 0.1],
[0.5, 2.0, 0.2],
[0.1, 0.2, 1.0]])
data = np.random.multivariate_normal(mu, true_cov, 10)
sd_dist = pm.Exponential.dist(1.0, shape=3)
chol, corr, stds = pm.LKJCholeskyCov('chol_cov', n=3, eta=2,
sd_dist=sd_dist, compute_corr=True)
vals = pm.MvNormal('vals', mu=mu, chol=chol, observed=data)
For unobserved values it can be better to use a non-centered
parametrization::
sd_dist = pm.Exponential.dist(1.0, shape=3)
chol, _, _ = pm.LKJCholeskyCov('chol_cov', n=3, eta=2,
sd_dist=sd_dist, compute_corr=True)
vals_raw = pm.Normal('vals_raw', mu=0, sigma=1, shape=(5, 3))
vals = pm.Deterministic('vals', pt.dot(chol, vals_raw.T).T)
"""
rv_op = multivariate_normal
[docs]
@classmethod
def dist(cls, mu=0, cov=None, *, tau=None, chol=None, lower=True, **kwargs):
mu = pt.as_tensor_variable(mu)
cov = quaddist_matrix(cov, chol, tau, lower)
# PyTensor is stricter about the shape of mu, than PyMC used to be
mu, _ = pt.broadcast_arrays(mu, cov[..., -1])
return super().dist([mu, cov], **kwargs)
def support_point(rv, size, mu, cov):
# mu is broadcasted to the potential length of cov in `dist`
support_point = mu
if not rv_size_is_none(size):
support_point_size = pt.concatenate([size, [mu.shape[-1]]])
support_point = pt.full(support_point_size, mu)
return support_point
def logp(value, mu, cov):
"""
Calculate log-probability of Multivariate Normal distribution
at specified value.
Parameters
----------
value: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
quaddist, logdet, posdef = quaddist_chol(value, mu, cov)
k = value.shape[-1].astype("floatX")
norm = -0.5 * k * np.log(2 * np.pi)
return check_parameters(
norm - 0.5 * quaddist - logdet,
posdef,
msg="posdef covariance",
)
class PrecisionMvNormalRV(SymbolicRandomVariable):
r"""A specialized multivariate normal random variable defined in terms of precision.
This class is introduced during specialization logprob rewrites, and not meant to be used directly.
"""
name = "precision_multivariate_normal"
extended_signature = "[rng],[size],(n),(n,n)->(n)"
_print_name = ("PrecisionMultivariateNormal", "\\operatorname{PrecisionMultivariateNormal}")
@classmethod
def rv_op(cls, mean, tau, *, rng=None, size=None):
rng = normalize_rng_param(rng)
size = normalize_size_param(size)
cov = pt.linalg.inv(tau)
next_rng, draws = multivariate_normal(mean, cov, size=size, rng=rng).owner.outputs
return cls(
inputs=[rng, size, mean, tau],
outputs=[next_rng, draws],
)(rng, size, mean, tau)
@_logprob.register
def precision_mv_normal_logp(op: PrecisionMvNormalRV, value, rng, size, mean, tau, **kwargs):
[value] = value
k = value.shape[-1].astype("floatX")
delta = value - mean
quadratic_form = delta.T @ tau @ delta
logdet, posdef = _logdet_from_cholesky(nan_lower_cholesky(tau))
logp = -0.5 * (k * pt.log(2 * np.pi) + quadratic_form) + logdet
return check_parameters(
logp,
posdef,
msg="posdef precision",
)
@node_rewriter(tracks=[MvNormalRV])
def mv_normal_to_precision_mv_normal(fgraph, node):
"""Replaces MvNormal(mu, inv(tau)) -> PrecisionMvNormal(mu, tau)
This is introduced in logprob rewrites to provide a more efficient logp for a MvNormal
that is defined by a precision matrix.
Note: This won't be introduced when calling `pm.logp` as that will dispatch directly
without triggering the logprob rewrites.
"""
rng, size, mu, cov = node.inputs
if cov.owner and cov.owner.op == matrix_inverse:
tau = cov.owner.inputs[0]
return PrecisionMvNormalRV.rv_op(mu, tau, size=size, rng=rng).owner.outputs
return None
specialization_ir_rewrites_db.register(
mv_normal_to_precision_mv_normal.__name__,
mv_normal_to_precision_mv_normal,
"basic",
)
class MvStudentTRV(RandomVariable):
name = "multivariate_studentt"
signature = "(),(n),(n,n)->(n)"
dtype = "floatX"
_print_name = ("MvStudentT", "\\operatorname{MvStudentT}")
@classmethod
def rng_fn(cls, rng, nu, mu, cov, size):
if size is None:
# When size is implicit, we need to broadcast parameters correctly,
# so that the MvNormal draws and the chisquare draws have the same number of batch dimensions.
# nu broadcasts mu and cov
if np.ndim(nu) > max(mu.ndim - 1, cov.ndim - 2):
_, mu, cov = broadcast_params((nu, mu, cov), ndims_params=cls.ndims_params)
# nu is broadcasted by either mu or cov
elif np.ndim(nu) < max(mu.ndim - 1, cov.ndim - 2):
nu, _, _ = broadcast_params((nu, mu, cov), ndims_params=cls.ndims_params)
mv_samples = multivariate_normal.rng_fn(rng=rng, mean=np.zeros_like(mu), cov=cov, size=size)
# Take chi2 draws and add an axis of length 1 to the right for correct broadcasting below
chi2_samples = np.sqrt(rng.chisquare(nu, size=size) / nu)[..., None]
return (mv_samples / chi2_samples) + mu
mv_studentt = MvStudentTRV()
[docs]
class MvStudentT(Continuous):
r"""
Multivariate Student-T log-likelihood.
.. math::
f(\mathbf{x}| \nu,\mu,\Sigma) =
\frac
{\Gamma\left[(\nu+p)/2\right]}
{\Gamma(\nu/2)\nu^{p/2}\pi^{p/2}
\left|{\Sigma}\right|^{1/2}
\left[
1+\frac{1}{\nu}
({\mathbf x}-{\mu})^T
{\Sigma}^{-1}({\mathbf x}-{\mu})
\right]^{-(\nu+p)/2}}
======== =============================================
Support :math:`x \in \mathbb{R}^p`
Mean :math:`\mu` if :math:`\nu > 1` else undefined
Variance :math:`\frac{\nu}{\mu-2}\Sigma`
if :math:`\nu>2` else undefined
======== =============================================
Parameters
----------
nu : tensor_like of float
Degrees of freedom, should be a positive scalar.
Sigma : tensor_like of float, optional
Scale matrix. Use `scale` in new code.
mu : tensor_like of float, optional
Vector of means.
scale : tensor_like of float, optional
The scale matrix.
tau : tensor_like of float, optional
The precision matrix.
chol : tensor_like of float, optional
The cholesky factor of the scale matrix.
lower : bool, default=True
Whether the cholesky fatcor is given as a lower triangular matrix.
"""
rv_op = mv_studentt
[docs]
@classmethod
def dist(cls, nu, *, Sigma=None, mu=0, scale=None, tau=None, chol=None, lower=True, **kwargs):
cov = kwargs.pop("cov", None)
if cov is not None:
warnings.warn(
"Use the scale argument to specify the scale matrix. "
"cov will be removed in future versions.",
FutureWarning,
)
scale = cov
if Sigma is not None:
if scale is not None:
raise ValueError("Specify only one of scale and Sigma")
scale = Sigma
nu = pt.as_tensor_variable(nu)
mu = pt.as_tensor_variable(mu)
scale = quaddist_matrix(scale, chol, tau, lower)
# PyTensor is stricter about the shape of mu, than PyMC used to be
mu, _ = pt.broadcast_arrays(mu, scale[..., -1])
return super().dist([nu, mu, scale], **kwargs)
def support_point(rv, size, nu, mu, scale):
# mu is broadcasted to the potential length of scale in `dist`
mu, _ = pt.random.utils.broadcast_params([mu, nu], ndims_params=[1, 0])
support_point = mu
if not rv_size_is_none(size):
support_point_size = pt.concatenate([size, [mu.shape[-1]]])
support_point = pt.full(support_point_size, support_point)
return support_point
def logp(value, nu, mu, scale):
"""
Calculate log-probability of Multivariate Student's T distribution
at specified value.
Parameters
----------
value: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
quaddist, logdet, ok = quaddist_chol(value, mu, scale)
k = value.shape[-1].astype("floatX")
norm = gammaln((nu + k) / 2.0) - gammaln(nu / 2.0) - 0.5 * k * pt.log(nu * np.pi)
inner = -(nu + k) / 2.0 * pt.log1p(quaddist / nu)
res = norm + inner - logdet
return check_parameters(res, ok, nu > 0, msg="posdef, nu > 0")
[docs]
class Dirichlet(SimplexContinuous):
r"""
Dirichlet log-likelihood.
.. math::
f(\mathbf{x}|\mathbf{a}) =
\frac{\Gamma(\sum_{i=1}^k a_i)}{\prod_{i=1}^k \Gamma(a_i)}
\prod_{i=1}^k x_i^{a_i - 1}
======== ===============================================
Support :math:`x_i \in (0, 1)` for :math:`i \in \{1, \ldots, K\}`
such that :math:`\sum x_i = 1`
Mean :math:`\dfrac{a_i}{\sum a_i}`
Variance :math:`\dfrac{a_i - \sum a_0}{a_0^2 (a_0 + 1)}`
where :math:`a_0 = \sum a_i`
======== ===============================================
Parameters
----------
a : tensor_like of float
Concentration parameters (a > 0). The number of categories is given by the
length of the last axis.
"""
rv_op = dirichlet
[docs]
@classmethod
def dist(cls, a, **kwargs):
a = pt.as_tensor_variable(a)
# mean = a / pt.sum(a)
# mode = pt.switch(pt.all(a > 1), (a - 1) / pt.sum(a - 1), np.nan)
return super().dist([a], **kwargs)
def support_point(rv, size, a):
norm_constant = pt.sum(a, axis=-1)[..., None]
support_point = a / norm_constant
if not rv_size_is_none(size):
support_point = pt.full(pt.concatenate([size, [a.shape[-1]]]), support_point)
return support_point
def logp(value, a):
"""
Calculate log-probability of Dirichlet distribution
at specified value.
Parameters
----------
value: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
# only defined for sum(value) == 1
res = pt.sum(logpow(value, a - 1) - gammaln(a), axis=-1) + gammaln(pt.sum(a, axis=-1))
res = pt.switch(
pt.or_(
pt.any(pt.lt(value, 0), axis=-1),
pt.any(pt.gt(value, 1), axis=-1),
),
-np.inf,
res,
)
return check_parameters(
res,
a > 0,
msg="a > 0",
)
[docs]
class Multinomial(Discrete):
r"""
Multinomial log-likelihood.
Generalizes binomial distribution, but instead of each trial resulting
in "success" or "failure", each one results in exactly one of some
fixed finite number k of possible outcomes over n independent trials.
'x[i]' indicates the number of times outcome number i was observed
over the n trials.
.. math::
f(x \mid n, p) = \frac{n!}{\prod_{i=1}^k x_i!} \prod_{i=1}^k p_i^{x_i}
========== ===========================================
Support :math:`x \in \{0, 1, \ldots, n\}` such that
:math:`\sum x_i = n`
Mean :math:`n p_i`
Variance :math:`n p_i (1 - p_i)`
Covariance :math:`-n p_i p_j` for :math:`i \ne j`
========== ===========================================
Parameters
----------
n : tensor_like of int
Total counts in each replicate (n > 0).
p : tensor_like of float
Probability of each one of the different outcomes (0 <= p <= 1). The number of
categories is given by the length of the last axis. Elements are expected to sum
to 1 along the last axis.
"""
rv_op = multinomial
[docs]
@classmethod
def dist(cls, n, p, *args, **kwargs):
p = pt.as_tensor_variable(p)
if isinstance(p, TensorConstant):
p_ = np.asarray(p.data)
if np.any(p_ < 0):
raise ValueError(f"Negative `p` parameters are not valid, got: {p_}")
p_sum_ = np.sum([p_], axis=-1)
if not np.all(np.isclose(p_sum_, 1.0)):
warnings.warn(
f"`p` parameters sum to {p_sum_}, instead of 1.0. "
"They will be automatically rescaled. "
"You can rescale them directly to get rid of this warning.",
UserWarning,
)
p_ = p_ / pt.sum(p_, axis=-1, keepdims=True)
p = pt.as_tensor_variable(p_)
n = pt.as_tensor_variable(n)
p = pt.as_tensor_variable(p)
return super().dist([n, p], *args, **kwargs)
def support_point(rv, size, n, p):
n = pt.shape_padright(n)
mean = n * p
mode = pt.round(mean)
# Add correction term between n and approximation.
# We modify highest expected entry to minimize chances of negative values.
diff = n - pt.sum(mode, axis=-1, keepdims=True)
max_elem_idx = pt.argmax(mean, axis=-1, keepdims=True)
mode = pt.inc_subtensor(
pt.take_along_axis(mode, max_elem_idx, axis=-1),
diff,
)
if not rv_size_is_none(size):
output_size = pt.concatenate([size, [p.shape[-1]]])
mode = pt.full(output_size, mode)
return Assert(
"Negative value in computed support_point of Multinomial."
"It is a known limitation that can arise when the expected largest count is small."
"Please provide an initial value manually."
)(mode, pt.all(mode >= 0))
def logp(value, n, p):
"""
Calculate log-probability of Multinomial distribution
at specified value.
Parameters
----------
value: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
res = factln(n) + pt.sum(-factln(value) + logpow(p, value), axis=-1)
res = pt.switch(
pt.or_(pt.any(pt.lt(value, 0), axis=-1), pt.neq(pt.sum(value, axis=-1), n)),
-np.inf,
res,
)
return check_parameters(
res,
0 <= p,
p <= 1,
pt.isclose(pt.sum(p, axis=-1), 1),
pt.ge(n, 0),
msg="0 <= p <= 1, sum(p) = 1, n >= 0",
)
class DirichletMultinomialRV(SymbolicRandomVariable):
name = "dirichlet_multinomial"
extended_signature = "[rng],[size],(),(p)->[rng],(p)"
_print_name = ("DirichletMultinomial", "\\operatorname{DirichletMultinomial}")
@classmethod
def rv_op(cls, n, a, *, size=None, rng=None):
n = pt.as_tensor(n, dtype=int)
a = pt.as_tensor(a)
rng = normalize_rng_param(rng)
size = normalize_size_param(size)
if rv_size_is_none(size):
size = implicit_size_from_params(n, a, ndims_params=cls.ndims_params)
next_rng, p = dirichlet(a, size=size, rng=rng).owner.outputs
final_rng, rv = multinomial(n, p, size=size, rng=next_rng).owner.outputs
return cls(
inputs=[rng, size, n, a],
outputs=[final_rng, rv],
)(rng, size, n, a)
[docs]
class DirichletMultinomial(Discrete):
r"""Dirichlet Multinomial log-likelihood.
Dirichlet mixture of Multinomials distribution, with a marginalized PMF.
.. math::
f(x \mid n, a) = \frac{\Gamma(n + 1)\Gamma(\sum a_k)}
{\Gamma(n + \sum a_k)}
\prod_{k=1}^K
\frac{\Gamma(x_k + a_k)}
{\Gamma(x_k + 1)\Gamma(a_k)}
========== ===========================================
Support :math:`x \in \{0, 1, \ldots, n\}` such that
:math:`\sum x_i = n`
Mean :math:`n \frac{a_i}{\sum{a_k}}`
========== ===========================================
Parameters
----------
n : tensor_like of int
Total counts in each replicate (n > 0).
a : tensor_like of float
Dirichlet concentration parameters (a > 0). The number of categories is given by
the length of the last axis.
"""
rv_type = DirichletMultinomialRV
rv_op = DirichletMultinomialRV.rv_op
[docs]
@classmethod
def dist(cls, n, a, *args, **kwargs):
return super().dist([n, a], **kwargs)
def support_point(rv, size, n, a):
p = a / pt.sum(a, axis=-1, keepdims=True)
return support_point(Multinomial.dist(n=n, p=p, size=size))
def logp(value, n, a):
"""
Calculate log-probability of DirichletMultinomial distribution
at specified value.
Parameters
----------
value: integer array
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
sum_a = a.sum(axis=-1)
const = (gammaln(n + 1) + gammaln(sum_a)) - gammaln(n + sum_a)
series = gammaln(value + a) - (gammaln(value + 1) + gammaln(a))
res = const + series.sum(axis=-1)
res = pt.switch(
pt.or_(
pt.any(pt.lt(value, 0), axis=-1),
pt.neq(pt.sum(value, axis=-1), n),
),
-np.inf,
res,
)
return check_parameters(
res,
a > 0,
n >= 0,
msg="a > 0, n >= 0",
)
class _OrderedMultinomial(Multinomial):
r"""
Underlying class for ordered multinomial distributions.
See docs for the OrderedMultinomial wrapper class for more details on how to use it in models.
"""
rv_op = multinomial
@classmethod
def dist(cls, eta, cutpoints, n, *args, **kwargs):
eta = pt.as_tensor_variable(eta)
cutpoints = pt.as_tensor_variable(cutpoints)
n = pt.as_tensor_variable(n, dtype=int)
pa = sigmoid(cutpoints - pt.shape_padright(eta))
p_cum = pt.concatenate(
[
pt.zeros_like(pt.shape_padright(pa[..., 0])),
pa,
pt.ones_like(pt.shape_padright(pa[..., 0])),
],
axis=-1,
)
p = p_cum[..., 1:] - p_cum[..., :-1]
return super().dist(n, p, *args, **kwargs)
[docs]
class OrderedMultinomial:
r"""
Wrapper class for Ordered Multinomial distributions.
Useful for regression on ordinal data whose values range
from 1 to K as a function of some predictor, :math:`\eta`, but
which are _aggregated_ by trial, like multinomial observations (in
contrast to `pm.OrderedLogistic`, which only accepts ordinal data
in a _disaggregated_ format, like categorical observations).
The cutpoints, :math:`c`, separate which ranges of :math:`\eta` are
mapped to which of the K observed dependent variables. The number
of cutpoints is K - 1. It is recommended that the cutpoints are
constrained to be ordered.
.. math::
f(k \mid \eta, c) = \left\{
\begin{array}{l}
1 - \text{logit}^{-1}(\eta - c_1)
\,, \text{if } k = 0 \\
\text{logit}^{-1}(\eta - c_{k - 1}) -
\text{logit}^{-1}(\eta - c_{k})
\,, \text{if } 0 < k < K \\
\text{logit}^{-1}(\eta - c_{K - 1})
\,, \text{if } k = K \\
\end{array}
\right.
Parameters
----------
eta : tensor_like of float
The predictor.
cutpoints : tensor_like of float
The length K - 1 array of cutpoints which break :math:`\eta` into
ranges. Do not explicitly set the first and last elements of
:math:`c` to negative and positive infinity.
n : tensor_like of int
The total number of multinomial trials.
compute_p : boolean, default=True
Whether to compute and store in the trace the inferred probabilities of each
categories,
based on the cutpoints' values. Defaults to True.
Might be useful to disable it if memory usage is of interest.
Examples
--------
.. code-block:: python
# Generate data for a simple 1 dimensional example problem
true_cum_p = np.array([0.1, 0.15, 0.25, 0.50, 0.65, 0.90, 1.0])
true_p = np.hstack([true_cum_p[0], true_cum_p[1:] - true_cum_p[:-1]])
fake_elections = np.random.multinomial(n=1_000, pvals=true_p, size=60)
# Ordered multinomial regression
with pm.Model() as model:
cutpoints = pm.Normal(
"cutpoints",
mu=np.arange(6) - 2.5,
sigma=1.5,
initval=np.arange(6) - 2.5,
transform=pm.distributions.transforms.ordered,
)
pm.OrderedMultinomial(
"results",
eta=0.0,
cutpoints=cutpoints,
n=fake_elections.sum(1),
observed=fake_elections,
)
trace = pm.sample()
# Plot the results
arviz.plot_posterior(trace_12_4, var_names=["complete_p"], ref_val=list(true_p));
"""
def __new__(cls, name, *args, compute_p=True, **kwargs):
out_rv = _OrderedMultinomial(name, *args, **kwargs)
if compute_p:
pm.Deterministic(f"{name}_probs", out_rv.owner.inputs[-1], dims=kwargs.get("dims"))
return out_rv
[docs]
@classmethod
def dist(cls, *args, **kwargs):
return _OrderedMultinomial.dist(*args, **kwargs)
def posdef(AA):
try:
scipy.linalg.cholesky(AA)
return True
except scipy.linalg.LinAlgError:
return False
class PosDefMatrix(Op):
"""
Check if input is positive definite. Input should be a square matrix.
"""
# Properties attribute
__props__ = ()
# Compulsory if itypes and otypes are not defined
def make_node(self, x):
x = pt.as_tensor_variable(x)
assert x.ndim == 2
o = TensorType(dtype="bool", shape=[])()
return Apply(self, [x], [o])
# Python implementation:
def perform(self, node, inputs, outputs):
(x,) = inputs
(z,) = outputs
try:
z[0] = np.array(posdef(x), dtype="bool")
except Exception:
pm._log.exception("Failed to check if %s positive definite", x)
raise
def infer_shape(self, fgraph, node, shapes):
return [[]]
def grad(self, inp, grads):
(x,) = inp
return [x.zeros_like(pytensor.config.floatX)]
def __str__(self):
return "MatrixIsPositiveDefinite"
matrix_pos_def = PosDefMatrix()
class WishartRV(RandomVariable):
name = "wishart"
signature = "(),(p,p)->(p,p)"
dtype = "floatX"
_print_name = ("Wishart", "\\operatorname{Wishart}")
@classmethod
def rng_fn(cls, rng, nu, V, size):
scipy_size = size if size else 1 # Default size for Scipy's wishart.rvs is 1
V = _squeeze_to_ndim(V, 2)
result = stats.wishart.rvs(int(nu), V, size=scipy_size, random_state=rng)
if size == (1,):
return result[np.newaxis, ...]
else:
return result
wishart = WishartRV()
[docs]
class Wishart(Continuous):
r"""
Wishart log-likelihood.
The Wishart distribution is the probability distribution of the
maximum-likelihood estimator (MLE) of the precision matrix of a
multivariate normal distribution. If V=1, the distribution is
identical to the chi-square distribution with nu degrees of
freedom.
.. math::
f(X \mid nu, T) =
\frac{{\mid T \mid}^{nu/2}{\mid X \mid}^{(nu-k-1)/2}}{2^{nu k/2}
\Gamma_p(nu/2)} \exp\left\{ -\frac{1}{2} Tr(TX) \right\}
where :math:`k` is the rank of :math:`X`.
======== =========================================
Support :math:`X(p x p)` positive definite matrix
Mean :math:`nu V`
Variance :math:`nu (v_{ij}^2 + v_{ii} v_{jj})`
======== =========================================
Parameters
----------
nu : tensor_like of int
Degrees of freedom, > 0.
V : tensor_like of float
p x p positive definite matrix.
Notes
-----
This distribution is unusable in a PyMC model. You should instead
use LKJCholeskyCov or LKJCorr.
"""
rv_op = wishart
[docs]
@classmethod
def dist(cls, nu, V, *args, **kwargs):
nu = pt.as_tensor_variable(nu, dtype=int)
V = pt.as_tensor_variable(V)
warnings.warn(
"The Wishart distribution can currently not be used "
"for MCMC sampling. The probability of sampling a "
"symmetric matrix is basically zero. Instead, please "
"use LKJCholeskyCov or LKJCorr. For more information "
"on the issues surrounding the Wishart see here: "
"https://github.com/pymc-devs/pymc/issues/538.",
UserWarning,
)
# mean = nu * V
# p = V.shape[0]
# mode = pt.switch(pt.ge(nu, p + 1), (nu - p - 1) * V, np.nan)
return super().dist([nu, V], *args, **kwargs)
def logp(X, nu, V):
"""
Calculate log-probability of Wishart distribution
at specified value.
Parameters
----------
X: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
p = V.shape[0]
IVI = det(V)
IXI = det(X)
return check_parameters(
(
(nu - p - 1) * pt.log(IXI)
- trace(matrix_inverse(V).dot(X))
- nu * p * pt.log(2)
- nu * pt.log(IVI)
- 2 * multigammaln(nu / 2.0, p)
)
/ 2,
matrix_pos_def(X),
pt.eq(X, X.T),
nu > (p - 1),
)
[docs]
def WishartBartlett(name, S, nu, is_cholesky=False, return_cholesky=False, initval=None):
r"""
Bartlett decomposition of the Wishart distribution. As the Wishart
distribution requires the matrix to be symmetric positive semi-definite
it is impossible for MCMC to ever propose acceptable matrices.
Instead, we can use the Barlett decomposition which samples a lower
diagonal matrix. Specifically:
.. math::
\text{If} L \sim \begin{pmatrix}
\sqrt{c_1} & 0 & 0 \\
z_{21} & \sqrt{c_2} & 0 \\
z_{31} & z_{32} & \sqrt{c_3}
\end{pmatrix}
\text{with} c_i \sim \chi^2(n-i+1) \text{ and } n_{ij} \sim \mathcal{N}(0, 1), \text{then} \\
L \times A \times A.T \times L.T \sim \text{Wishart}(L \times L.T, \nu)
See http://en.wikipedia.org/wiki/Wishart_distribution#Bartlett_decomposition
for more information.
Parameters
----------
S : ndarray
p x p positive definite matrix
Or:
p x p lower-triangular matrix that is the Cholesky factor
of the covariance matrix.
nu : tensor_like of int
Degrees of freedom, > dim(S).
is_cholesky : bool, default=False
Input matrix S is already Cholesky decomposed as S.T * S
return_cholesky : bool, default=False
Only return the Cholesky decomposed matrix.
initval : ndarray
p x p positive definite matrix used to initialize
Notes
-----
This is not a standard Distribution class but follows a similar
interface. Besides the Wishart distribution, it will add RVs
name_c and name_z to your model which make up the matrix.
This distribution is usually a bad idea to use as a prior for multivariate
normal. You should instead use LKJCholeskyCov or LKJCorr.
"""
L = S if is_cholesky else scipy.linalg.cholesky(S)
diag_idx = np.diag_indices_from(S)
tril_idx = np.tril_indices_from(S, k=-1)
n_diag = len(diag_idx[0])
n_tril = len(tril_idx[0])
if initval is not None:
# Inverse transform
initval = np.dot(np.dot(np.linalg.inv(L), initval), np.linalg.inv(L.T))
initval = scipy.linalg.cholesky(initval, lower=True)
diag_testval = initval[diag_idx] ** 2
tril_testval = initval[tril_idx]
else:
diag_testval = None
tril_testval = None
c = pt.sqrt(
ChiSquared(f"{name}_c", nu - np.arange(2, 2 + n_diag), shape=n_diag, initval=diag_testval)
)
pm._log.info(f"Added new variable {name}_c to model diagonal of Wishart.")
z = Normal(f"{name}_z", 0.0, 1.0, shape=n_tril, initval=tril_testval)
pm._log.info(f"Added new variable {name}_z to model off-diagonals of Wishart.")
# Construct A matrix
A = pt.zeros(S.shape, dtype=np.float32)
A = pt.set_subtensor(A[diag_idx], c)
A = pt.set_subtensor(A[tril_idx], z)
# L * A * A.T * L.T ~ Wishart(L*L.T, nu)
if return_cholesky:
return pm.Deterministic(name, pt.dot(L, A))
else:
return pm.Deterministic(name, pt.dot(pt.dot(pt.dot(L, A), A.T), L.T))
def _lkj_normalizing_constant(eta, n):
# TODO: This is mixing python branching with the potentially symbolic n and eta variables
if not isinstance(eta, int | float):
raise NotImplementedError("eta must be an int or float")
if not isinstance(n, int):
raise NotImplementedError("n must be an integer")
if eta == 1:
result = gammaln(2.0 * pt.arange(1, int((n - 1) / 2) + 1)).sum()
if n % 2 == 1:
result += (
0.25 * (n**2 - 1) * pt.log(np.pi)
- 0.25 * (n - 1) ** 2 * pt.log(2.0)
- (n - 1) * gammaln(int((n + 1) / 2))
)
else:
result += (
0.25 * n * (n - 2) * pt.log(np.pi)
+ 0.25 * (3 * n**2 - 4 * n) * pt.log(2.0)
+ n * gammaln(n / 2)
- (n - 1) * gammaln(n)
)
else:
result = -(n - 1) * gammaln(eta + 0.5 * (n - 1))
k = pt.arange(1, n)
result += (0.5 * k * pt.log(np.pi) + gammaln(eta + 0.5 * (n - 1 - k))).sum()
return result
class _LKJCholeskyCovBaseRV(RandomVariable):
name = "_lkjcholeskycovbase"
signature = "(),(),(d)->(n)"
dtype = "floatX"
_print_name = ("_lkjcholeskycovbase", "\\operatorname{_lkjcholeskycovbase}")
def make_node(self, rng, size, n, eta, D):
n = pt.as_tensor_variable(n)
if not all(n.type.broadcastable):
raise ValueError("n must be a scalar.")
eta = pt.as_tensor_variable(eta)
if not all(eta.type.broadcastable):
raise ValueError("eta must be a scalar.")
D = pt.as_tensor_variable(D)
return super().make_node(rng, size, n, eta, D)
def _supp_shape_from_params(self, dist_params, param_shapes):
n = dist_params[0].squeeze()
return ((n * (n + 1)) // 2,)
def rng_fn(self, rng, n, eta, D, size):
# We flatten the size to make operations easier, and then rebuild it
if size is None:
size = D.shape[:-1]
flat_size = np.prod(size).astype(int)
n = n.squeeze()
eta = eta.squeeze()
C = LKJCorrRV._random_corr_matrix(rng=rng, n=n, eta=eta, flat_size=flat_size)
D = D.reshape(flat_size, n)
C *= D[..., :, np.newaxis] * D[..., np.newaxis, :]
tril_idx = np.tril_indices(n, k=0)
samples = np.linalg.cholesky(C)[..., tril_idx[0], tril_idx[1]]
if size is None:
samples = samples[0]
else:
dist_shape = (n * (n + 1)) // 2
samples = np.reshape(samples, (*size, dist_shape))
return samples
_ljk_cholesky_cov_base = _LKJCholeskyCovBaseRV()
# _LKJCholeskyCovBaseRV requires a properly shaped `D`, which means the variable can't
# be safely resized. Because of this, we add the thin SymbolicRandomVariable wrapper
class _LKJCholeskyCovRV(SymbolicRandomVariable):
extended_signature = "[rng],(),(),(n)->[rng],(n)"
_print_name = ("_lkjcholeskycov", "\\operatorname{_lkjcholeskycov}")
@classmethod
def rv_op(cls, n, eta, sd_dist, *, size=None):
# We don't allow passing `rng` because we don't fully control the rng of the components!
n = pt.as_tensor(n, dtype="int64", ndim=0)
eta = pt.as_tensor_variable(eta, ndim=0)
rng = pytensor.shared(np.random.default_rng())
size = normalize_size_param(size)
# We resize the sd_dist automatically so that it has (size x n) independent
# draws which is what the `_LKJCholeskyCovBaseRV.rng_fn` expects. This makes the
# random and logp methods equivalent, as the latter also assumes a unique value
# for each diagonal element.
# Since `eta` and `n` are forced to be scalars we don't need to worry about
# implied batched dimensions from those for the time being.
if rv_size_is_none(size):
size = sd_dist.shape[:-1]
shape = (*size, n)
if sd_dist.owner.op.ndim_supp == 0:
sd_dist = change_dist_size(sd_dist, shape)
else:
# The support shape must be `n` but we have no way of controlling it
sd_dist = change_dist_size(sd_dist, shape[:-1])
next_rng, lkjcov = _ljk_cholesky_cov_base(n, eta, sd_dist, rng=rng).owner.outputs
return _LKJCholeskyCovRV(
inputs=[rng, n, eta, sd_dist],
outputs=[next_rng, lkjcov],
)(rng, n, eta, sd_dist)
def update(self, node):
return {node.inputs[0]: node.outputs[0]}
class _LKJCholeskyCov(Distribution):
r"""Underlying class for covariance matrix with LKJ distributed correlations.
See docs for LKJCholeskyCov function for more details on how to use it in models.
"""
rv_type = _LKJCholeskyCovRV
rv_op = _LKJCholeskyCovRV.rv_op
@classmethod
def dist(cls, n, eta, sd_dist, **kwargs):
if not (
isinstance(sd_dist, Variable)
and sd_dist.owner is not None
and isinstance(sd_dist.owner.op, RandomVariable | SymbolicRandomVariable)
and sd_dist.owner.op.ndim_supp < 2
):
raise TypeError("sd_dist must be a scalar or vector distribution variable")
check_dist_not_registered(sd_dist)
return super().dist([n, eta, sd_dist], **kwargs)
@_change_dist_size.register(_LKJCholeskyCovRV)
def change_LKJCholeksyCovRV_size(op, dist, new_size, expand=False):
n, eta, sd_dist = dist.owner.inputs[1:]
if expand:
old_size = sd_dist.shape[:-1]
new_size = tuple(new_size) + tuple(old_size)
return _LKJCholeskyCov.rv_op(n, eta, sd_dist, size=new_size)
@_support_point.register(_LKJCholeskyCovRV)
def _LKJCholeksyCovRV_support_point(op, rv, rng, n, eta, sd_dist):
diag_idxs = (pt.cumsum(pt.arange(1, n + 1)) - 1).astype("int32")
support_point = pt.zeros_like(rv)
support_point = pt.set_subtensor(support_point[..., diag_idxs], 1)
return support_point
@_default_transform.register(_LKJCholeskyCovRV)
def _LKJCholeksyCovRV_default_transform(op, rv):
_, n, _, _ = rv.owner.inputs
return transforms.CholeskyCovPacked(n)
@_logprob.register(_LKJCholeskyCovRV)
def _LKJCholeksyCovRV_logp(op, values, rng, n, eta, sd_dist, **kwargs):
(value,) = values
if value.ndim > 1:
raise ValueError("_LKJCholeskyCov logp is only implemented for vector values (ndim=1)")
diag_idxs = pt.cumsum(pt.arange(1, n + 1)) - 1
cumsum = pt.cumsum(value**2)
variance = pt.zeros(pt.atleast_1d(n))
variance = pt.inc_subtensor(variance[0], value[0] ** 2)
variance = pt.inc_subtensor(variance[1:], cumsum[diag_idxs[1:]] - cumsum[diag_idxs[:-1]])
sd_vals = pt.sqrt(variance)
logp_sd = pm.logp(sd_dist, sd_vals).sum()
corr_diag = value[diag_idxs] / sd_vals
logp_lkj = (2 * eta - 3 + n - pt.arange(n)) * pt.log(corr_diag)
logp_lkj = pt.sum(logp_lkj)
# Compute the log det jacobian of the second transformation
# described in the docstring.
idx = pt.arange(n)
det_invjac = pt.log(corr_diag) - idx * pt.log(sd_vals)
det_invjac = det_invjac.sum()
# TODO: _lkj_normalizing_constant currently requires `eta` and `n` to be constants
try:
n = int(get_underlying_scalar_constant_value(n))
except NotScalarConstantError:
raise NotImplementedError("logp only implemented for constant `n`")
try:
eta = float(get_underlying_scalar_constant_value(eta))
except NotScalarConstantError:
raise NotImplementedError("logp only implemented for constant `eta`")
norm = _lkj_normalizing_constant(eta, n)
return norm + logp_lkj + logp_sd + det_invjac
[docs]
class LKJCholeskyCov:
r"""Wrapper class for covariance matrix with LKJ distributed correlations.
This defines a distribution over Cholesky decomposed covariance
matrices, such that the underlying correlation matrices follow an
LKJ distribution [1] and the standard deviations follow an arbitrary
distribution specified by the user.
Parameters
----------
name : str
The name given to the variable in the model.
eta : tensor_like of float
The shape parameter (eta > 0) of the LKJ distribution. eta = 1
implies a uniform distribution of the correlation matrices;
larger values put more weight on matrices with few correlations.
n : tensor_like of int
Dimension of the covariance matrix (n > 1).
sd_dist : Distribution
A positive scalar or vector distribution for the standard deviations, created
with the `.dist()` API. Should have `shape[-1]=n`. Scalar distributions will be
automatically resized to ensure this.
.. warning:: sd_dist will be cloned, rendering it independent of the one passed as input.
compute_corr : bool, default=True
If `True`, returns three values: the Cholesky decomposition, the correlations
and the standard deviations of the covariance matrix. Otherwise, only returns
the packed Cholesky decomposition. Defaults to `True`.
compatibility.
store_in_trace : bool, default=True
Whether to store the correlations and standard deviations of the covariance
matrix in the posterior trace. If `True`, they will automatically be named as
`{name}_corr` and `{name}_stds` respectively. Effective only when
`compute_corr=True`.
Returns
-------
chol : TensorVariable
If `compute_corr=True`. The unpacked Cholesky covariance decomposition.
corr : TensorVariable
If `compute_corr=True`. The correlations of the covariance matrix.
stds : TensorVariable
If `compute_corr=True`. The standard deviations of the covariance matrix.
packed_chol : TensorVariable
If `compute_corr=False` The packed Cholesky covariance decomposition.
Notes
-----
Since the Cholesky factor is a lower triangular matrix, we use packed storage for
the matrix: We store the values of the lower triangular matrix in a one-dimensional
array, numbered by row::
[[0 - - -]
[1 2 - -]
[3 4 5 -]
[6 7 8 9]]
The unpacked Cholesky covariance matrix is automatically computed and returned when
you specify `compute_corr=True` in `pm.LKJCholeskyCov` (see example below).
Otherwise, you can use `pm.expand_packed_triangular(packed_cov, lower=True)`
to convert the packed Cholesky matrix to a regular two-dimensional array.
Examples
--------
.. code:: python
with pm.Model() as model:
# Note that we access the distribution for the standard
# deviations, and do not create a new random variable.
sd_dist = pm.Exponential.dist(1.0, size=10)
chol, corr, sigmas = pm.LKJCholeskyCov(
'chol_cov', eta=4, n=10, sd_dist=sd_dist
)
# if you only want the packed Cholesky:
# packed_chol = pm.LKJCholeskyCov(
'chol_cov', eta=4, n=10, sd_dist=sd_dist, compute_corr=False
)
# chol = pm.expand_packed_triangular(10, packed_chol, lower=True)
# Define a new MvNormal with the given covariance
vals = pm.MvNormal('vals', mu=np.zeros(10), chol=chol, shape=10)
# Or transform an uncorrelated normal:
vals_raw = pm.Normal('vals_raw', mu=0, sigma=1, shape=10)
vals = pt.dot(chol, vals_raw)
# Or compute the covariance matrix
cov = pt.dot(chol, chol.T)
**Implementation** In the unconstrained space all values of the cholesky factor
are stored untransformed, except for the diagonal entries, where
we use a log-transform to restrict them to positive values.
To correctly compute log-likelihoods for the standard deviations
and the correlation matrix separately, we need to consider a
second transformation: Given a cholesky factorization
:math:`LL^T = \Sigma` of a covariance matrix we can recover the
standard deviations :math:`\sigma` as the euclidean lengths of
the rows of :math:`L`, and the cholesky factor of the
correlation matrix as :math:`U = \text{diag}(\sigma)^{-1}L`.
Since each row of :math:`U` has length 1, we do not need to
store the diagonal. We define a transformation :math:`\phi`
such that :math:`\phi(L)` is the lower triangular matrix containing
the standard deviations :math:`\sigma` on the diagonal and the
correlation matrix :math:`U` below. In this form we can easily
compute the different likelihoods separately, as the likelihood
of the correlation matrix only depends on the values below the
diagonal, and the likelihood of the standard deviation depends
only on the diagonal values.
We still need the determinant of the jacobian of :math:`\phi^{-1}`.
If we think of :math:`\phi` as an automorphism on
:math:`\mathbb{R}^{\tfrac{n(n+1)}{2}}`, where we order
the dimensions as described in the notes above, the jacobian
is a block-diagonal matrix, where each block corresponds to
one row of :math:`U`. Each block has arrowhead shape, and we
can compute the determinant of that as described in [2]. Since
the determinant of a block-diagonal matrix is the product
of the determinants of the blocks, we get
.. math::
\text{det}(J_{\phi^{-1}}(U)) =
\left[
\prod_{i=2}^N u_{ii}^{i - 1} L_{ii}
\right]^{-1}
References
----------
.. [1] Lewandowski, D., Kurowicka, D. and Joe, H. (2009).
"Generating random correlation matrices based on vines and
extended onion method." Journal of multivariate analysis,
100(9), pp.1989-2001.
.. [2] J. M. isn't a mathematician (http://math.stackexchange.com/users/498/
j-m-isnt-a-mathematician), Different approaches to evaluate this
determinant, URL (version: 2012-04-14):
http://math.stackexchange.com/q/130026
"""
def __new__(cls, name, eta, n, sd_dist, *, compute_corr=True, store_in_trace=True, **kwargs):
packed_chol = _LKJCholeskyCov(name, eta=eta, n=n, sd_dist=sd_dist, **kwargs)
if not compute_corr:
return packed_chol
else:
chol, corr, stds = cls.helper_deterministics(n, packed_chol)
if store_in_trace:
corr = pm.Deterministic(f"{name}_corr", corr)
stds = pm.Deterministic(f"{name}_stds", stds)
return chol, corr, stds
[docs]
@classmethod
def dist(cls, eta, n, sd_dist, *, compute_corr=True, **kwargs):
# compute Cholesky decomposition
packed_chol = _LKJCholeskyCov.dist(eta=eta, n=n, sd_dist=sd_dist, **kwargs)
if not compute_corr:
return packed_chol
else:
return cls.helper_deterministics(n, packed_chol)
@classmethod
def helper_deterministics(cls, n, packed_chol):
chol = pm.expand_packed_triangular(n, packed_chol, lower=True)
# compute covariance matrix
cov = pt.dot(chol, chol.T)
# extract standard deviations and rho
stds = pt.sqrt(pt.diag(cov))
inv_stds = 1 / stds
corr = inv_stds[None, :] * cov * inv_stds[:, None]
return chol, corr, stds
class LKJCorrRV(RandomVariable):
name = "lkjcorr"
signature = "(),()->(n)"
dtype = "floatX"
_print_name = ("LKJCorrRV", "\\operatorname{LKJCorrRV}")
def make_node(self, rng, size, n, eta):
n = pt.as_tensor_variable(n)
if not all(n.type.broadcastable):
raise ValueError("n must be a scalar.")
eta = pt.as_tensor_variable(eta)
if not all(eta.type.broadcastable):
raise ValueError("eta must be a scalar.")
return super().make_node(rng, size, n, eta)
def _supp_shape_from_params(self, dist_params, **kwargs):
n = dist_params[0].squeeze()
dist_shape = ((n * (n - 1)) // 2,)
return dist_shape
@classmethod
def rng_fn(cls, rng, n, eta, size):
# We flatten the size to make operations easier, and then rebuild it
if size is None:
flat_size = 1
else:
flat_size = np.prod(size).astype(int)
n = n.squeeze()
eta = eta.squeeze()
C = cls._random_corr_matrix(rng=rng, n=n, eta=eta, flat_size=flat_size)
triu_idx = np.triu_indices(n, k=1)
samples = C[..., triu_idx[0], triu_idx[1]]
if size is None:
samples = samples[0]
else:
dist_shape = (n * (n - 1)) // 2
samples = np.reshape(samples, (*size, dist_shape))
return samples
@classmethod
def _random_corr_matrix(cls, rng, n, eta, flat_size):
# original implementation in R see:
# https://github.com/rmcelreath/rethinking/blob/master/R/distributions.r
beta = eta - 1.0 + n / 2.0
r12 = 2.0 * stats.beta.rvs(a=beta, b=beta, size=flat_size, random_state=rng) - 1.0
P = np.full((flat_size, n, n), np.eye(n))
P[..., 0, 1] = r12
P[..., 1, 1] = np.sqrt(1.0 - r12**2)
for mp1 in range(2, n):
beta -= 0.5
y = stats.beta.rvs(a=mp1 / 2.0, b=beta, size=flat_size, random_state=rng)
z = stats.norm.rvs(loc=0, scale=1, size=(flat_size, mp1), random_state=rng)
z = z / np.sqrt(np.einsum("ij,ij->i", z, z))[..., np.newaxis]
P[..., 0:mp1, mp1] = np.sqrt(y[..., np.newaxis]) * z
P[..., mp1, mp1] = np.sqrt(1.0 - y)
C = np.einsum("...ji,...jk->...ik", P, P)
return C
lkjcorr = LKJCorrRV()
class MultivariateIntervalTransform(Interval):
name = "interval"
def log_jac_det(self, *args):
return super().log_jac_det(*args).sum(-1)
# Returns list of upper triangular values
class _LKJCorr(BoundedContinuous):
rv_op = lkjcorr
@classmethod
def dist(cls, n, eta, **kwargs):
n = pt.as_tensor_variable(n).astype(int)
eta = pt.as_tensor_variable(eta)
return super().dist([n, eta], **kwargs)
def support_point(rv, *args):
return pt.zeros_like(rv)
def logp(value, n, eta):
"""
Calculate log-probability of LKJ distribution at specified
value.
Parameters
----------
value: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
if value.ndim > 1:
raise NotImplementedError("LKJCorr logp is only implemented for vector values (ndim=1)")
# TODO: PyTensor does not have a `triu_indices`, so we can only work with constant
# n (or else find a different expression)
try:
n = int(get_underlying_scalar_constant_value(n))
except NotScalarConstantError:
raise NotImplementedError("logp only implemented for constant `n`")
shape = n * (n - 1) // 2
tri_index = np.zeros((n, n), dtype="int32")
tri_index[np.triu_indices(n, k=1)] = np.arange(shape)
tri_index[np.triu_indices(n, k=1)[::-1]] = np.arange(shape)
value = pt.take(value, tri_index)
value = pt.fill_diagonal(value, 1)
# TODO: _lkj_normalizing_constant currently requires `eta` and `n` to be constants
try:
eta = float(get_underlying_scalar_constant_value(eta))
except NotScalarConstantError:
raise NotImplementedError("logp only implemented for constant `eta`")
result = _lkj_normalizing_constant(eta, n)
result += (eta - 1.0) * pt.log(det(value))
return check_parameters(
result,
value >= -1,
value <= 1,
matrix_pos_def(value),
eta > 0,
)
@_default_transform.register(_LKJCorr)
def lkjcorr_default_transform(op, rv):
return MultivariateIntervalTransform(-1.0, 1.0)
[docs]
class LKJCorr:
r"""
The LKJ (Lewandowski, Kurowicka and Joe) log-likelihood.
The LKJ distribution is a prior distribution for correlation matrices.
If eta = 1 this corresponds to the uniform distribution over correlation
matrices. For eta -> oo the LKJ prior approaches the identity matrix.
======== ==============================================
Support Upper triangular matrix with values in [-1, 1]
======== ==============================================
Parameters
----------
n : tensor_like of int
Dimension of the covariance matrix (n > 1).
eta : tensor_like of float
The shape parameter (eta > 0) of the LKJ distribution. eta = 1
implies a uniform distribution of the correlation matrices;
larger values put more weight on matrices with few correlations.
return_matrix : bool, default=False
If True, returns the full correlation matrix.
False only returns the values of the upper triangular matrix excluding
diagonal in a single vector of length n(n-1)/2 for memory efficiency
Notes
-----
This is mainly useful if you want the standard deviations to be fixed, as
LKJCholsekyCov is optimized for the case where they come from a distribution.
Examples
--------
.. code:: python
with pm.Model() as model:
# Define the vector of fixed standard deviations
sds = 3*np.ones(10)
corr = pm.LKJCorr(
'corr', eta=4, n=10, return_matrix=True
)
# Define a new MvNormal with the given correlation matrix
vals = sds*pm.MvNormal('vals', mu=np.zeros(10), cov=corr, shape=10)
# Or transform an uncorrelated normal distribution:
vals_raw = pm.Normal('vals_raw', shape=10)
chol = pt.linalg.cholesky(corr)
vals = sds*pt.dot(chol,vals_raw)
# The matrix is internally still sampled as a upper triangular vector
# If you want access to it in matrix form in the trace, add
pm.Deterministic('corr_mat', corr)
References
----------
.. [LKJ2009] Lewandowski, D., Kurowicka, D. and Joe, H. (2009).
"Generating random correlation matrices based on vines and
extended onion method." Journal of multivariate analysis,
100(9), pp.1989-2001.
"""
def __new__(cls, name, n, eta, *, return_matrix=False, **kwargs):
c_vec = _LKJCorr(name, eta=eta, n=n, **kwargs)
if not return_matrix:
return c_vec
else:
return cls.vec_to_corr_mat(c_vec, n)
[docs]
@classmethod
def dist(cls, n, eta, *, return_matrix=False, **kwargs):
c_vec = _LKJCorr.dist(eta=eta, n=n, **kwargs)
if not return_matrix:
return c_vec
else:
return cls.vec_to_corr_mat(c_vec, n)
@classmethod
def vec_to_corr_mat(cls, vec, n):
tri = pt.zeros(pt.concatenate([vec.shape[:-1], (n, n)]))
tri = pt.subtensor.set_subtensor(tri[(..., *np.triu_indices(n, 1))], vec)
return tri + pt.moveaxis(tri, -2, -1) + pt.diag(pt.ones(n))
class MatrixNormalRV(RandomVariable):
name = "matrixnormal"
signature = "(m,n),(m,m),(n,n)->(m,n)"
dtype = "floatX"
_print_name = ("MatrixNormal", "\\operatorname{MatrixNormal}")
@classmethod
def rng_fn(cls, rng, mu, rowchol, colchol, size=None):
if size is None:
size = np.broadcast_shapes(mu.shape[:-2], rowchol.shape[:-2], colchol.shape[:-2])
dist_shape = (rowchol.shape[-2], colchol.shape[-2])
output_shape = size + dist_shape
standard_normal = rng.standard_normal(output_shape)
return mu + np.matmul(rowchol, np.matmul(standard_normal, np.swapaxes(colchol, -1, -2)))
matrixnormal = MatrixNormalRV()
[docs]
class MatrixNormal(Continuous):
r"""
Matrix-valued normal log-likelihood.
.. math::
f(x \mid \mu, U, V) =
\frac{1}{(2\pi^{m n} |U|^n |V|^m)^{1/2}}
\exp\left\{
-\frac{1}{2} \mathrm{Tr}[ V^{-1} (x-\mu)^{\prime} U^{-1} (x-\mu)]
\right\}
=============== =====================================
Support :math:`x \in \mathbb{R}^{m \times n}`
Mean :math:`\mu`
Row Variance :math:`U`
Column Variance :math:`V`
=============== =====================================
Parameters
----------
mu : tensor_like of float
Array of means. Must be broadcastable with the random variable X such
that the shape of mu + X is (M, N).
rowcov : (M, M) tensor_like of float, optional
Among-row covariance matrix. Defines variance within
columns. Exactly one of rowcov or rowchol is needed.
rowchol : (M, M) tensor_like of float, optional
Cholesky decomposition of among-row covariance matrix. Exactly one of
rowcov or rowchol is needed.
colcov : (N, N) tensor_like of float, optional
Among-column covariance matrix. If rowcov is the identity matrix,
this functions as `cov` in MvNormal.
Exactly one of colcov or colchol is needed.
colchol : (N, N) tensor_like of float, optional
Cholesky decomposition of among-column covariance matrix. Exactly one
of colcov or colchol is needed.
Examples
--------
Define a matrixvariate normal variable for given row and column covariance
matrices::
colcov = np.array([[1., 0.5], [0.5, 2]])
rowcov = np.array([[1, 0, 0], [0, 4, 0], [0, 0, 16]])
m = rowcov.shape[0]
n = colcov.shape[0]
mu = np.zeros((m, n))
vals = pm.MatrixNormal('vals', mu=mu, colcov=colcov,
rowcov=rowcov)
Above, the ith row in vals has a variance that is scaled by 4^i.
Alternatively, row or column cholesky matrices could be substituted for
either covariance matrix. The MatrixNormal is quicker way compute
MvNormal(mu, np.kron(rowcov, colcov)) that takes advantage of kronecker product
properties for inversion. For example, if draws from MvNormal had the same
covariance structure, but were scaled by different powers of an unknown
constant, both the covariance and scaling could be learned as follows
(see the docstring of `LKJCholeskyCov` for more information about this)
.. code:: python
# Setup data
true_colcov = np.array([[1.0, 0.5, 0.1],
[0.5, 1.0, 0.2],
[0.1, 0.2, 1.0]])
m = 3
n = true_colcov.shape[0]
true_scale = 3
true_rowcov = np.diag([true_scale**(2*i) for i in range(m)])
mu = np.zeros((m, n))
true_kron = np.kron(true_rowcov, true_colcov)
data = np.random.multivariate_normal(mu.flatten(), true_kron)
data = data.reshape(m, n)
with pm.Model() as model:
# Setup right cholesky matrix
sd_dist = pm.HalfCauchy.dist(beta=2.5, shape=3)
colchol_packed = pm.LKJCholeskyCov('colcholpacked', n=3, eta=2,
sd_dist=sd_dist)
colchol = pm.expand_packed_triangular(3, colchol_packed)
# Setup left covariance matrix
scale = pm.LogNormal('scale', mu=np.log(true_scale), sigma=0.5)
rowcov = pt.diag([scale**(2*i) for i in range(m)])
vals = pm.MatrixNormal('vals', mu=mu, colchol=colchol, rowcov=rowcov,
observed=data)
"""
rv_op = matrixnormal
[docs]
@classmethod
def dist(
cls,
mu,
rowcov=None,
rowchol=None,
colcov=None,
colchol=None,
*args,
**kwargs,
):
lower_cholesky = partial(cholesky, lower=True, on_error="raise")
# Among-row matrices
if len([i for i in [rowcov, rowchol] if i is not None]) != 1:
raise ValueError(
"Incompatible parameterization. Specify exactly one of rowcov, or rowchol."
)
if rowcov is not None:
if rowcov.ndim != 2:
raise ValueError("rowcov must be two dimensional.")
rowchol_cov = lower_cholesky(rowcov)
else:
if rowchol.ndim != 2:
raise ValueError("rowchol must be two dimensional.")
rowchol_cov = pt.as_tensor_variable(rowchol)
# Among-column matrices
if len([i for i in [colcov, colchol] if i is not None]) != 1:
raise ValueError(
"Incompatible parameterization. Specify exactly one of colcov, or colchol."
)
if colcov is not None:
colcov = pt.as_tensor_variable(colcov)
if colcov.ndim != 2:
raise ValueError("colcov must be two dimensional.")
colchol_cov = lower_cholesky(colcov)
else:
if colchol.ndim != 2:
raise ValueError("colchol must be two dimensional.")
colchol_cov = pt.as_tensor_variable(colchol)
dist_shape = (rowchol_cov.shape[-1], colchol_cov.shape[-1])
# Broadcasting mu
mu = pt.extra_ops.broadcast_to(mu, shape=dist_shape)
mu = pt.as_tensor_variable(mu)
return super().dist([mu, rowchol_cov, colchol_cov], **kwargs)
def support_point(rv, size, mu, rowchol, colchol):
return pt.full_like(rv, mu)
def logp(value, mu, rowchol, colchol):
"""
Calculate log-probability of Matrix-valued Normal distribution
at specified value.
Parameters
----------
value: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
if value.ndim != 2:
raise ValueError("Value must be two dimensional.")
# Compute Tr[colcov^-1 @ (x - mu).T @ rowcov^-1 @ (x - mu)] and
# the logdet of colcov and rowcov.
delta = value - mu
# Find exponent piece by piece
right_quaddist = solve_lower(rowchol, delta)
quaddist = pt.linalg.matrix_dot(right_quaddist.T, right_quaddist)
quaddist = solve_lower(colchol, quaddist)
quaddist = solve_upper(colchol.T, quaddist)
trquaddist = pt.linalg.trace(quaddist)
coldiag = pt.diag(colchol)
rowdiag = pt.diag(rowchol)
half_collogdet = pt.sum(pt.log(coldiag)) # logdet(M) = 2*Tr(log(L))
half_rowlogdet = pt.sum(pt.log(rowdiag)) # Using Cholesky: M = L L^T
m = rowchol.shape[0]
n = colchol.shape[0]
norm = -0.5 * m * n * np.log(2 * np.pi)
return norm - 0.5 * trquaddist - m * half_collogdet - n * half_rowlogdet
class KroneckerNormalRV(SymbolicRandomVariable):
ndim_supp = 1
_print_name = ("KroneckerNormal", "\\operatorname{KroneckerNormal}")
@classmethod
def rv_op(cls, mu, sigma, *covs, size=None, rng=None):
mu = pt.as_tensor(mu)
sigma = pt.as_tensor(sigma)
covs = [pt.as_tensor(cov) for cov in covs]
rng = normalize_rng_param(rng)
size = normalize_size_param(size)
cov = reduce(pt.linalg.kron, covs)
cov = cov + sigma**2 * pt.eye(cov.shape[-2])
next_rng, draws = multivariate_normal(mean=mu, cov=cov, size=size, rng=rng).owner.outputs
covs_sig = ",".join(f"(a{i},b{i})" for i in range(len(covs)))
extended_signature = f"[rng],[size],(m),(),{covs_sig}->[rng],(m)"
return KroneckerNormalRV(
inputs=[rng, size, mu, sigma, *covs],
outputs=[next_rng, draws],
extended_signature=extended_signature,
)(rng, size, mu, sigma, *covs)
[docs]
class KroneckerNormal(Continuous):
r"""
Multivariate normal log-likelihood with Kronecker-structured covariance.
.. math::
f(x \mid \mu, K) =
\frac{1}{(2\pi |K|)^{1/2}}
\exp\left\{ -\frac{1}{2} (x-\mu)^{\prime} K^{-1} (x-\mu) \right\}
======== ==========================
Support :math:`x \in \mathbb{R}^N`
Mean :math:`\mu`
Variance :math:`K = \bigotimes K_i + \sigma^2 I_N`
======== ==========================
Parameters
----------
mu : tensor_like of float
Vector of means, just as in `MvNormal`.
covs : list of arrays
The set of covariance matrices :math:`[K_1, K_2, ...]` to be
Kroneckered in the order provided :math:`\bigotimes K_i`.
chols : list of arrays
The set of lower cholesky matrices :math:`[L_1, L_2, ...]` such that
:math:`K_i = L_i L_i'`.
evds : list of tuples
The set of eigenvalue-vector, eigenvector-matrix pairs
:math:`[(v_1, Q_1), (v_2, Q_2), ...]` such that
:math:`K_i = Q_i \text{diag}(v_i) Q_i'`. For example::
v_i, Q_i = pt.linalg.eigh(K_i)
sigma : scalar, optional
Standard deviation of the Gaussian white noise.
Examples
--------
Define a multivariate normal variable with a covariance
:math:`K = K_1 \otimes K_2`
.. code:: python
K1 = np.array([[1., 0.5], [0.5, 2]])
K2 = np.array([[1., 0.4, 0.2], [0.4, 2, 0.3], [0.2, 0.3, 1]])
covs = [K1, K2]
N = 6
mu = np.zeros(N)
with pm.Model() as model:
vals = pm.KroneckerNormal('vals', mu=mu, covs=covs, shape=N)
Efficiency gains are made by cholesky decomposing :math:`K_1` and
:math:`K_2` individually rather than the larger :math:`K` matrix. Although
only two matrices :math:`K_1` and :math:`K_2` are shown here, an arbitrary
number of submatrices can be combined in this way. Choleskys and
eigendecompositions can be provided instead
.. code:: python
chols = [np.linalg.cholesky(Ki) for Ki in covs]
evds = [np.linalg.eigh(Ki) for Ki in covs]
with pm.Model() as model:
vals2 = pm.KroneckerNormal('vals2', mu=mu, chols=chols, shape=N)
# or
vals3 = pm.KroneckerNormal('vals3', mu=mu, evds=evds, shape=N)
neither of which will be converted. Diagonal noise can also be added to
the covariance matrix, :math:`K = K_1 \otimes K_2 + \sigma^2 I_N`.
Despite the noise removing the overall Kronecker structure of the matrix,
`KroneckerNormal` can continue to make efficient calculations by
utilizing eigendecompositons of the submatrices behind the scenes [1].
Thus,
.. code:: python
sigma = 0.1
with pm.Model() as noise_model:
vals = pm.KroneckerNormal('vals', mu=mu, covs=covs, sigma=sigma, shape=N)
vals2 = pm.KroneckerNormal('vals2', mu=mu, chols=chols, sigma=sigma, shape=N)
vals3 = pm.KroneckerNormal('vals3', mu=mu, evds=evds, sigma=sigma, shape=N)
are identical, with `covs` and `chols` each converted to
eigendecompositions.
References
----------
.. [1] Saatchi, Y. (2011). "Scalable inference for structured Gaussian process models"
"""
rv_type = KroneckerNormalRV
rv_op = KroneckerNormalRV.rv_op
[docs]
@classmethod
def dist(cls, mu, covs=None, chols=None, evds=None, sigma=None, *args, **kwargs):
if len([i for i in [covs, chols, evds] if i is not None]) != 1:
raise ValueError(
"Incompatible parameterization. Specify exactly one of covs, chols, or evds."
)
sigma = sigma if sigma else 0
if chols is not None:
covs = [chol.dot(chol.T) for chol in chols]
elif evds is not None:
eigh_iterable = evds
covs = []
eigs_sep, Qs = zip(*eigh_iterable) # Unzip
for eig, Q in zip(eigs_sep, Qs):
cov_i = pt.dot(Q, pt.dot(pt.diag(eig), Q.T))
covs.append(cov_i)
mu = pt.as_tensor_variable(mu)
return super().dist([mu, sigma, *covs], **kwargs)
def support_point(rv, rng, size, mu, sigma, *covs):
return pt.full_like(rv, mu)
def logp(value, rng, size, mu, sigma, *covs):
"""
Calculate log-probability of Multivariate Normal distribution
with Kronecker-structured covariance at specified value.
Parameters
----------
value: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
# Computes the quadratic (x-mu)^T @ K^-1 @ (x-mu) and log(det(K))
if value.ndim > 2 or value.ndim == 0:
raise ValueError(f"Invalid dimension for value: {value.ndim}")
if value.ndim == 1:
onedim = True
value = value[None, :]
else:
onedim = False
delta = value - mu
eigh_iterable = map(eigh, covs)
eigs_sep, Qs = zip(*eigh_iterable) # Unzip
Qs = list(map(pt.as_tensor_variable, Qs))
QTs = list(map(pt.transpose, Qs))
eigs_sep = list(map(pt.as_tensor_variable, eigs_sep))
eigs = kron_diag(*eigs_sep) # Combine separate eigs
eigs += sigma**2
N = eigs.shape[0]
sqrt_quad = kron_dot(QTs, delta.T)
sqrt_quad = sqrt_quad / pt.sqrt(eigs[:, None])
logdet = pt.sum(pt.log(eigs))
# Square each sample
quad = pt.batched_dot(sqrt_quad.T, sqrt_quad.T)
if onedim:
quad = quad[0]
a = -(quad + logdet + N * pt.log(2 * np.pi)) / 2.0
return a
class CARRV(RandomVariable):
name = "car"
signature = "(m),(m,m),(),(),()->(m)"
dtype = "floatX"
_print_name = ("CAR", "\\operatorname{CAR}")
def make_node(self, rng, size, mu, W, alpha, tau, W_is_valid):
mu = pt.as_tensor_variable(mu)
W = pytensor.sparse.as_sparse_or_tensor_variable(W)
tau = pt.as_tensor_variable(tau)
alpha = pt.as_tensor_variable(alpha)
W_is_valid = pt.as_tensor_variable(W_is_valid, dtype=bool)
if not (W.ndim >= 2 and all(W.type.broadcastable[:-2])):
raise TypeError("W must be a matrix")
if not all(tau.type.broadcastable):
raise TypeError("tau must be a scalar")
if not all(alpha.type.broadcastable):
raise TypeError("alpha must be a scalar")
return super().make_node(rng, size, mu, W, alpha, tau, W_is_valid)
@classmethod
def rng_fn(cls, rng: np.random.RandomState, mu, W, alpha, tau, W_is_valid, size):
"""
Implementation of algorithm from paper
Havard Rue, 2001. "Fast sampling of Gaussian Markov random fields,"
Journal of the Royal Statistical Society Series B, Royal Statistical Society,
vol. 63(2), pages 325-338. DOI: 10.1111/1467-9868.00288
"""
if not W_is_valid.all():
raise ValueError("W must be a valid adjacency matrix")
if np.any(alpha >= 1) or np.any(alpha <= -1):
raise ValueError("the domain of alpha is: -1 < alpha < 1")
# TODO: If there are batch dims, even if W was already sparse,
# we will have some expensive dense_from_sparse and sparse_from_dense
# operations that we should avoid. See https://github.com/pymc-devs/pytensor/issues/839
W = _squeeze_to_ndim(W, 2)
if not scipy.sparse.issparse(W):
W = scipy.sparse.csr_matrix(W)
tau = scipy.sparse.csr_matrix(_squeeze_to_ndim(tau, 0))
alpha = scipy.sparse.csr_matrix(_squeeze_to_ndim(alpha, 0))
s = np.asarray(W.sum(axis=0))[0]
D = scipy.sparse.diags(s)
Q = tau.multiply(D - alpha.multiply(W))
perm_array = scipy.sparse.csgraph.reverse_cuthill_mckee(Q, symmetric_mode=True)
inv_perm = np.argsort(perm_array)
Q = Q[perm_array, :][:, perm_array]
Qb = Q.diagonal()
u = 1
while np.count_nonzero(Q.diagonal(u)) > 0:
Qb = np.vstack((np.pad(Q.diagonal(u), (u, 0), constant_values=(0, 0)), Qb))
u += 1
L = scipy.linalg.cholesky_banded(Qb, lower=False)
size = tuple(size or ())
if size:
mu = np.broadcast_to(mu, (*size, mu.shape[-1]))
z = rng.normal(size=mu.shape)
samples = np.empty(z.shape)
for idx in np.ndindex(mu.shape[:-1]):
samples[idx] = scipy.linalg.cho_solve_banded((L, False), z[idx]) + mu[idx][perm_array]
samples = samples[..., inv_perm]
return samples
car = CARRV()
[docs]
class CAR(Continuous):
r"""
Likelihood for a conditional autoregression. This is a special case of the
multivariate normal with an adjacency-structured covariance matrix.
.. math::
f(x \mid W, \alpha, \tau) =
\frac{|T|^{1/2}}{(2\pi)^{k/2}}
\exp\left\{ -\frac{1}{2} (x-\mu)^{\prime} T^{-1} (x-\mu) \right\}
where :math:`T = (\tau D(I-\alpha W))^{-1}` and :math:`D = diag(\sum_i W_{ij})`.
======== ==========================
Support :math:`x \in \mathbb{R}^k`
Mean :math:`\mu \in \mathbb{R}^k`
Variance :math:`(\tau D(I-\alpha W))^{-1}`
======== ==========================
Parameters
----------
mu : tensor_like of float
Real-valued mean vector
W : (M, M) tensor_like of int
Symmetric adjacency matrix of 1s and 0s indicating
adjacency between elements. If possible, *W* is converted
to a sparse matrix, falling back to a dense variable.
:func:`~pytensor.sparse.basic.as_sparse_or_tensor_variable` is
used for this sparse or tensorvariable conversion.
alpha : tensor_like of float
Autoregression parameter taking values greater than -1 and less than 1.
Values closer to 0 indicate weaker correlation and values closer to
1 indicate higher autocorrelation. For most use cases, the
support of alpha should be restricted to (0, 1).
tau : tensor_like of float
Positive precision variable controlling the scale of the underlying normal variates.
References
----------
.. Jin, X., Carlin, B., Banerjee, S.
"Generalized Hierarchical Multivariate CAR Models for Areal Data"
Biometrics, Vol. 61, No. 4 (Dec., 2005), pp. 950-961
"""
rv_op = car
[docs]
@classmethod
def dist(cls, mu, W, alpha, tau, *args, **kwargs):
# This variable has an expensive validation check, that we want to constant-fold if possible
# So it's passed as an explicit input
W = pytensor.sparse.as_sparse_or_tensor_variable(W)
if isinstance(W.type, pytensor.sparse.SparseTensorType):
abs_diff = pytensor.sparse.basic.mul(pytensor.sparse.sign(W - W.T), W - W.T)
W_is_valid = pt.isclose(pytensor.sparse.sp_sum(abs_diff), 0)
else:
W_is_valid = pt.allclose(W, W.T)
return super().dist([mu, W, alpha, tau, W_is_valid], **kwargs)
def support_point(rv, size, mu, W, alpha, tau, W_is_valid):
return pt.full_like(rv, mu)
def logp(value, mu, W, alpha, tau, W_is_valid):
"""
Calculate log-probability of a CAR-distributed vector
at specified value. This log probability function differs from
the true CAR log density (AKA a multivariate normal with CAR-structured
covariance matrix) by an additive constant.
Parameters
----------
value: array
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
# If expand_dims were added to (a potentially sparse) W, retrieve the non-expanded W
extra_dims = W.type.ndim - 2
if extra_dims:
if (
W.owner
and isinstance(W.owner.op, DimShuffle)
and W.owner.op.new_order == (*("x",) * extra_dims, 0, 1)
):
W = W.owner.inputs[0]
else:
W = pt.squeeze(W, axis=tuple(range(extra_dims)))
if W.owner and isinstance(W.owner.op, DenseFromSparse):
W = W.owner.inputs[0]
sparse = isinstance(W, pytensor.sparse.SparseVariable)
if sparse:
D = sp_sum(W, axis=0)
Dinv_sqrt = pt.diag(1 / pt.sqrt(D))
DWD = pt.dot(pytensor.sparse.dot(Dinv_sqrt, W), Dinv_sqrt)
else:
D = W.sum(axis=0)
Dinv_sqrt = pt.diag(1 / pt.sqrt(D))
DWD = pt.dot(pt.dot(Dinv_sqrt, W), Dinv_sqrt)
lam = pt.linalg.eigvalsh(DWD, pt.eye(DWD.shape[0]))
d, _ = W.shape
if value.ndim == 1:
value = value[None, :]
logtau = d * pt.log(tau).sum(axis=-1)
logdet = pt.log(1 - alpha.T * lam[:, None]).sum()
delta = value - mu
if sparse:
Wdelta = pytensor.sparse.dot(delta, W)
else:
Wdelta = pt.dot(delta, W)
tau_dot_delta = D[None, :] * delta - alpha * Wdelta
logquad = (tau * delta * tau_dot_delta).sum(axis=-1)
return check_parameters(
0.5 * (logtau + logdet - logquad),
-1 < alpha,
alpha < 1,
tau > 0,
W_is_valid,
msg="-1 < alpha < 1, tau > 0, W is a symmetric adjacency matrix.",
)
class ICARRV(RandomVariable):
name = "icar"
signature = "(m,m),(),()->(m)"
dtype = "floatX"
_print_name = ("ICAR", "\\operatorname{ICAR}")
def __call__(self, W, sigma, zero_sum_stdev, size=None, **kwargs):
return super().__call__(W, sigma, zero_sum_stdev, size=size, **kwargs)
@classmethod
def rng_fn(cls, rng, size, W, sigma, zero_sum_stdev):
raise NotImplementedError("Cannot sample from ICAR prior")
icar = ICARRV()
[docs]
class ICAR(Continuous):
r"""
The intrinsic conditional autoregressive prior. It is primarily used to model
covariance between neighboring areas. It is a special case
of the :class:`~pymc.CAR` distribution where alpha is set to 1.
The log probability density function is
.. math::
f(\phi| W,\sigma) =
-\frac{1}{2\sigma^{2}} \sum_{i\sim j} (\phi_{i} - \phi_{j})^2 -
\frac{1}{2}*\frac{\sum_{i}{\phi_{i}}}{0.001N}^{2} - \ln{\sqrt{2\\pi}} -
\ln{0.001N}
The first term represents the spatial covariance component. Each $\\phi_{i}$ is penalized
based on the square distance from each of its neighbors. The notation $i\\sim j$
indicates a sum over all the neighbors of $\\phi_{i}$. The last three terms are the
Normal log density function where the mean is zero and the standard deviation is
$N * 0.001$ (where N is the length of the vector $\\phi$). This component imposes
a zero-sum constraint by finding the sum of the vector $\\phi$ and penalizing based
on its distance from zero.
Parameters
----------
W : ndarray of int
Symmetric adjacency matrix of 1s and 0s indicating adjacency between elements.
sigma : scalar, default 1
Standard deviation of the vector of phi's. Putting a prior on sigma
will result in a centered parameterization. In most cases, it is
preferable to use a non-centered parameterization by using the default
value and multiplying the resulting phi's by sigma. See the example below.
zero_sum_stdev : scalar, default 0.001
Controls how strongly to enforce the zero-sum constraint. The sum of
phi is normally distributed with a mean of zero and small standard deviation.
This parameter sets the standard deviation of a normal density function with
mean zero.
Examples
--------
This example illustrates how to switch between centered and non-centered
parameterizations.
.. code-block:: python
import numpy as np
import pymc as pm
# 4x4 adjacency matrix
# arranged in a square lattice
W = np.array([
[0,1,0,1],
[1,0,1,0],
[0,1,0,1],
[1,0,1,0]
])
# centered parameterization
with pm.Model():
sigma = pm.Exponential('sigma', 1)
phi = pm.ICAR('phi', W=W, sigma=sigma)
mu = phi
# non-centered parameterization
with pm.Model():
sigma = pm.Exponential('sigma', 1)
phi = pm.ICAR('phi', W=W)
mu = sigma * phi
References
----------
.. Mitzi, M., Wheeler-Martin, K., Simpson, D., Mooney, J. S.,
Gelman, A., Dimaggio, C.
"Bayesian hierarchical spatial models: Implementing the Besag York
Mollié model in stan"
Spatial and Spatio-temporal Epidemiology, Vol. 31, (Aug., 2019),
pp 1-18
.. Banerjee, S., Carlin, B., Gelfand, A. Hierarchical Modeling
and Analysis for Spatial Data. Second edition. CRC press. (2015)
"""
rv_op = icar
[docs]
@classmethod
def dist(cls, W, sigma=1, zero_sum_stdev=0.001, **kwargs):
# Note: These checks are forcing W to be non-symbolic
if not W.ndim == 2:
raise ValueError("W must be matrix with ndim=2")
if not W.shape[0] == W.shape[1]:
raise ValueError("W must be a square matrix")
if not np.allclose(W.T, W):
raise ValueError("W must be a symmetric matrix")
if np.any((W != 0) & (W != 1)):
raise ValueError("W must be composed of only 1s and 0s")
W = pt.as_tensor_variable(W, dtype=int)
sigma = pt.as_tensor_variable(sigma)
zero_sum_stdev = pt.as_tensor_variable(zero_sum_stdev)
return super().dist([W, sigma, zero_sum_stdev], **kwargs)
def support_point(rv, size, W, sigma, zero_sum_stdev):
N = pt.shape(W)[-2]
return pt.zeros(N)
def logp(value, W, sigma, zero_sum_stdev):
# convert adjacency matrix to edgelist representation
# An edgelist is a pair of lists.
# If node i and node j are connected then one list
# will contain i and the other will contain j at the same
# index value.
# We only use the lower triangle here because adjacency
# is a undirected connection.
N = pt.shape(W)[-2]
node1, node2 = pt.eq(pt.tril(W), 1).nonzero()
pairwise_difference = (-1 / (2 * sigma**2)) * pt.sum(pt.square(value[node1] - value[node2]))
zero_sum = (
-0.5 * pt.pow(pt.sum(value) / (zero_sum_stdev * N), 2)
- pt.log(pt.sqrt(2.0 * np.pi))
- pt.log(zero_sum_stdev * N)
)
return check_parameters(pairwise_difference + zero_sum, sigma > 0, msg="sigma > 0")
class StickBreakingWeightsRV(RandomVariable):
name = "stick_breaking_weights"
signature = "(),()->(k)"
dtype = "floatX"
_print_name = ("StickBreakingWeights", "\\operatorname{StickBreakingWeights}")
def make_node(self, rng, size, alpha, K):
alpha = pt.as_tensor_variable(alpha)
K = pt.as_tensor_variable(K, dtype=int)
if not all(K.type.broadcastable):
raise ValueError("K must be a scalar.")
return super().make_node(rng, size, alpha, K)
def _supp_shape_from_params(self, dist_params, param_shapes):
K = dist_params[1]
return (K.squeeze() + 1,)
@classmethod
def rng_fn(cls, rng, alpha, K, size):
K = K.squeeze()
if K < 0:
raise ValueError("K needs to be positive.")
size = to_tuple(size) if size is not None else alpha.shape
size = (*size, K)
alpha = alpha[..., np.newaxis]
betas = rng.beta(1, alpha, size=size)
sticks = np.concatenate(
(
np.ones(shape=(size[:-1] + (1,))),
np.cumprod(1 - betas[..., :-1], axis=-1),
),
axis=-1,
)
weights = sticks * betas
weights = np.concatenate(
(weights, 1 - weights.sum(axis=-1)[..., np.newaxis]),
axis=-1,
)
return weights
stickbreakingweights = StickBreakingWeightsRV()
[docs]
class StickBreakingWeights(SimplexContinuous):
r"""
Likelihood of truncated stick-breaking weights. The weights are generated from a
stick-breaking proceduce where :math:`x_k = v_k \prod_{\ell < k} (1 - v_\ell)` for
:math:`k \in \{1, \ldots, K\}` and :math:`x_K = \prod_{\ell = 1}^{K} (1 - v_\ell) = 1 - \sum_{\ell=1}^K x_\ell`
with :math:`v_k \stackrel{\text{i.i.d.}}{\sim} \text{Beta}(1, \alpha)`.
.. math:
f(\mathbf{x}|\alpha, K) =
B(1, \alpha)^{-K}x_{K+1}^\alpha \prod_{k=1}^{K+1}\left\{\sum_{j=k}^{K+1} x_j\right\}^{-1}
======== ===============================================
Support :math:`x_k \in (0, 1)` for :math:`k \in \{1, \ldots, K+1\}`
such that :math:`\sum x_k = 1`
Mean :math:`\mathbb{E}[x_k] = \dfrac{1}{1 + \alpha}\left(\dfrac{\alpha}{1 + \alpha}\right)^{k - 1}`
for :math:`k \in \{1, \ldots, K\}` and :math:`\mathbb{E}[x_{K+1}] = \left(\dfrac{\alpha}{1 + \alpha}\right)^{K}`
======== ===============================================
Parameters
----------
alpha : tensor_like of float
Concentration parameter (alpha > 0).
K : tensor_like of int
The number of "sticks" to break off from an initial one-unit stick. The length of the weight
vector is K + 1, where the last weight is one minus the sum of all the first sticks.
References
----------
.. [1] Ishwaran, H., & James, L. F. (2001). Gibbs sampling methods for stick-breaking priors.
Journal of the American Statistical Association, 96(453), 161-173.
.. [2] Müller, P., Quintana, F. A., Jara, A., & Hanson, T. (2015). Bayesian nonparametric data
analysis. New York: Springer.
"""
rv_op = stickbreakingweights
[docs]
@classmethod
def dist(cls, alpha, K, *args, **kwargs):
alpha = pt.as_tensor_variable(alpha)
K = pt.as_tensor_variable(K, dtype=int)
return super().dist([alpha, K], **kwargs)
def support_point(rv, size, alpha, K):
K = K.squeeze()
alpha = alpha[..., np.newaxis]
support_point = (alpha / (1 + alpha)) ** pt.arange(K)
support_point *= 1 / (1 + alpha)
support_point = pt.concatenate([support_point, (alpha / (1 + alpha)) ** K], axis=-1)
if not rv_size_is_none(size):
support_point_size = pt.concatenate(
[
size,
[
K + 1,
],
]
)
support_point = pt.full(support_point_size, support_point)
return support_point
def logp(value, alpha, K):
"""
Calculate log-probability of the distribution induced from the stick-breaking process
at specified value.
Parameters
----------
value: numeric
Value for which log-probability is calculated.
Returns
-------
TensorVariable
"""
logp = -pt.sum(
pt.log(
pt.cumsum(
value[..., ::-1],
axis=-1,
)
),
axis=-1,
)
logp += -K * betaln(1, alpha)
logp += alpha * pt.log(value[..., -1])
logp = pt.switch(
pt.or_(
pt.any(
pt.and_(pt.le(value, 0), pt.ge(value, 1)),
axis=-1,
),
pt.or_(
pt.bitwise_not(pt.allclose(value.sum(-1), 1)),
pt.neq(value.shape[-1], K + 1),
),
),
-np.inf,
logp,
)
return check_parameters(
logp,
alpha > 0,
K > 0,
msg="alpha > 0, K > 0",
)
class ZeroSumNormalRV(SymbolicRandomVariable):
"""ZeroSumNormal random variable"""
_print_name = ("ZeroSumNormal", "\\operatorname{ZeroSumNormal}")
@classmethod
def rv_op(cls, sigma, support_shape, *, size=None, rng=None):
n_zerosum_axes = pt.get_vector_length(support_shape)
sigma = pt.as_tensor(sigma)
support_shape = pt.as_tensor(support_shape, ndim=1)
rng = normalize_rng_param(rng)
size = normalize_size_param(size)
if rv_size_is_none(size):
# Size is implied by shape of sigma
size = sigma.shape[:-n_zerosum_axes]
shape = tuple(size) + tuple(support_shape)
next_rng, normal_dist = pm.Normal.dist(sigma=sigma, shape=shape, rng=rng).owner.outputs
# Zerosum-normaling is achieved by subtracting the mean along the given n_zerosum_axes
zerosum_rv = normal_dist
for axis in range(n_zerosum_axes):
zerosum_rv -= zerosum_rv.mean(axis=-axis - 1, keepdims=True)
support_str = ",".join([f"d{i}" for i in range(n_zerosum_axes)])
extended_signature = f"[rng],(),(s),[size]->[rng],({support_str})"
return ZeroSumNormalRV(
inputs=[rng, sigma, support_shape, size],
outputs=[next_rng, zerosum_rv],
extended_signature=extended_signature,
)(rng, sigma, support_shape, size)
[docs]
class ZeroSumNormal(Distribution):
r"""
ZeroSumNormal distribution, i.e Normal distribution where one or
several axes are constrained to sum to zero.
By default, the last axis is constrained to sum to zero.
See `n_zerosum_axes` kwarg for more details.
.. math::
\begin{align*}
ZSN(\sigma) = N \Big( 0, \sigma^2 (I - \tfrac{1}{n}J) \Big) \\
\text{where} \ ~ J_{ij} = 1 \ ~ \text{and} \\
n = \text{nbr of zero-sum axes}
\end{align*}
Parameters
----------
sigma : tensor_like of float
Scale parameter (sigma > 0).
It's actually the standard deviation of the underlying, unconstrained Normal distribution.
Defaults to 1 if not specified. ``sigma`` cannot have length > 1 across the zero-sum axes.
n_zerosum_axes: int, defaults to 1
Number of axes along which the zero-sum constraint is enforced, starting from the rightmost position.
Defaults to 1, i.e the rightmost axis.
zerosum_axes: int, deprecated please use n_zerosum_axes as its successor
dims: sequence of strings, optional
Dimension names of the distribution. Works the same as for other PyMC distributions.
Necessary if ``shape`` is not passed.
shape: tuple of integers, optional
Shape of the distribution. Works the same as for other PyMC distributions.
Necessary if ``dims`` or ``observed`` is not passed.
Warnings
--------
Currently, ``sigma``cannot have length > 1 across the zero-sum axes to ensure the zero-sum constraint.
``n_zerosum_axes`` has to be > 0. If you want the behavior of ``n_zerosum_axes = 0``,
just use ``pm.Normal``.
Examples
--------
Define a `ZeroSumNormal` variable, with `sigma=1` and
`n_zerosum_axes=1` by default::
COORDS = {
"regions": ["a", "b", "c"],
"answers": ["yes", "no", "whatever", "don't understand question"],
}
with pm.Model(coords=COORDS) as m:
# the zero sum axis will be 'answers'
v = pm.ZeroSumNormal("v", dims=("regions", "answers"))
with pm.Model(coords=COORDS) as m:
# the zero sum axes will be 'answers' and 'regions'
v = pm.ZeroSumNormal("v", dims=("regions", "answers"), n_zerosum_axes=2)
with pm.Model(coords=COORDS) as m:
# the zero sum axes will be the last two
v = pm.ZeroSumNormal("v", shape=(3, 4, 5), n_zerosum_axes=2)
"""
rv_type = ZeroSumNormalRV
rv_op = ZeroSumNormalRV.rv_op
def __new__(
cls, *args, zerosum_axes=None, n_zerosum_axes=None, support_shape=None, dims=None, **kwargs
):
if zerosum_axes is not None:
n_zerosum_axes = zerosum_axes
warnings.warn(
"The 'zerosum_axes' parameter is deprecated. Use 'n_zerosum_axes' instead.",
DeprecationWarning,
)
if dims is not None or kwargs.get("observed") is not None:
n_zerosum_axes = cls.check_zerosum_axes(n_zerosum_axes)
support_shape = get_support_shape(
support_shape=support_shape,
shape=None, # Shape will be checked in `cls.dist`
dims=dims,
observed=kwargs.get("observed", None),
ndim_supp=n_zerosum_axes,
)
return super().__new__(
cls,
*args,
n_zerosum_axes=n_zerosum_axes,
support_shape=support_shape,
dims=dims,
**kwargs,
)
[docs]
@classmethod
def dist(cls, sigma=1.0, n_zerosum_axes=None, support_shape=None, **kwargs):
n_zerosum_axes = cls.check_zerosum_axes(n_zerosum_axes)
sigma = pt.as_tensor(sigma)
if not all(sigma.type.broadcastable[-n_zerosum_axes:]):
raise ValueError("sigma must have length one across the zero-sum axes")
support_shape = get_support_shape(
support_shape=support_shape,
shape=kwargs.get("shape"),
ndim_supp=n_zerosum_axes,
)
if support_shape is None:
if n_zerosum_axes > 0:
raise ValueError("You must specify dims, shape or support_shape parameter")
support_shape = pt.as_tensor(support_shape, dtype="int64", ndim=1)
assert n_zerosum_axes == pt.get_vector_length(
support_shape
), "support_shape has to be as long as n_zerosum_axes"
return super().dist([sigma, support_shape], **kwargs)
@classmethod
def check_zerosum_axes(cls, n_zerosum_axes: int | None) -> int:
if n_zerosum_axes is None:
n_zerosum_axes = 1
if not isinstance(n_zerosum_axes, int):
raise TypeError("n_zerosum_axes has to be an integer")
if not n_zerosum_axes > 0:
raise ValueError("n_zerosum_axes has to be > 0")
return n_zerosum_axes
@_support_point.register(ZeroSumNormalRV)
def zerosumnormal_support_point(op, rv, *rv_inputs):
return pt.zeros_like(rv)
@_default_transform.register(ZeroSumNormalRV)
def zerosum_default_transform(op, rv):
n_zerosum_axes = tuple(np.arange(-op.ndim_supp, 0))
return ZeroSumTransform(n_zerosum_axes)
@_logprob.register(ZeroSumNormalRV)
def zerosumnormal_logp(op, values, rng, sigma, support_shape, size, **kwargs):
(value,) = values
shape = value.shape
n_zerosum_axes = op.ndim_supp
_deg_free_support_shape = pt.inc_subtensor(shape[-n_zerosum_axes:], -1)
_full_size = pt.prod(shape).astype("floatX")
_degrees_of_freedom = pt.prod(_deg_free_support_shape).astype("floatX")
zerosums = [
pt.all(pt.isclose(pt.mean(value, axis=-axis - 1), 0, atol=1e-9))
for axis in range(n_zerosum_axes)
]
out = pt.sum(
-0.5 * pt.pow(value / sigma, 2)
- (pt.log(pt.sqrt(2.0 * np.pi)) + pt.log(sigma)) * _degrees_of_freedom / _full_size,
axis=tuple(np.arange(-n_zerosum_axes, 0)),
)
return check_parameters(out, *zerosums, msg="mean(value, axis=n_zerosum_axes) = 0")