# Copyright 2024 The PyMC Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import warnings
import numpy as np
import pytensor
import pytensor.tensor as pt
from pytensor.graph.basic import Apply, equal_computations
from pytensor.tensor import TensorVariable
from pytensor.tensor.random.op import RandomVariable
from pytensor.tensor.random.utils import normalize_size_param
from pymc.distributions import transforms
from pymc.distributions.continuous import Gamma, LogNormal, Normal, get_tau_sigma
from pymc.distributions.discrete import Binomial, NegativeBinomial, Poisson
from pymc.distributions.dist_math import check_parameters
from pymc.distributions.distribution import (
DiracDelta,
Distribution,
SymbolicRandomVariable,
_support_point,
support_point,
)
from pymc.distributions.shape_utils import _change_dist_size, change_dist_size, rv_size_is_none
from pymc.distributions.transforms import _default_transform
from pymc.distributions.truncated import Truncated
from pymc.logprob.abstract import _logcdf, _logcdf_helper, _logprob
from pymc.logprob.basic import logp
from pymc.logprob.transforms import IntervalTransform
from pymc.util import check_dist_not_registered
from pymc.vartypes import continuous_types, discrete_types
__all__ = [
"HurdleGamma",
"HurdleLogNormal",
"HurdleNegativeBinomial",
"HurdlePoisson",
"Mixture",
"NormalMixture",
"ZeroInflatedBinomial",
"ZeroInflatedNegativeBinomial",
"ZeroInflatedPoisson",
]
class MarginalMixtureRV(SymbolicRandomVariable):
"""A placeholder used to specify a log-likelihood for a mixture sub-graph."""
_print_name = ("MarginalMixture", "\\operatorname{MarginalMixture}")
@classmethod
def rv_op(cls, weights, *components, size=None):
# We don't allow passing `rng` because we don't fully control the rng of the components!
mix_indexes_rng = pytensor.shared(np.random.default_rng())
single_component = len(components) == 1
ndim_supp = components[0].owner.op.ndim_supp
size = normalize_size_param(size)
if not rv_size_is_none(size):
components = cls._resize_components(size, *components)
elif not single_component:
# We might need to broadcast components when size is not specified
shape = tuple(pt.broadcast_shape(*components))
size = shape[: len(shape) - ndim_supp]
components = cls._resize_components(size, *components)
# Extract replication ndims from components and weights
ndim_batch = components[0].ndim - ndim_supp
if single_component:
# One dimension is taken by the mixture axis in the single component case
ndim_batch -= 1
# The weights may imply extra batch dimensions that go beyond what is already
# implied by the component dimensions (ndim_batch)
weights_ndim_batch = max(0, weights.ndim - ndim_batch - 1)
# If weights are large enough that they would broadcast the component distributions
# we try to resize them. This in necessary to avoid duplicated values in the
# random method and for equivalency with the logp method
if weights_ndim_batch:
new_size = pt.concatenate(
[
weights.shape[:weights_ndim_batch],
components[0].shape[:ndim_batch],
]
)
components = cls._resize_components(new_size, *components)
# Extract support and batch ndims from components and weights
ndim_batch = components[0].ndim - ndim_supp
if single_component:
ndim_batch -= 1
weights_ndim_batch = max(0, weights.ndim - ndim_batch - 1)
assert weights_ndim_batch == 0
mix_axis = -ndim_supp - 1
# Stack components across mixture axis
if single_component:
# If single component, we consider it as being already "stacked"
stacked_components = components[0]
else:
stacked_components = pt.stack(components, axis=mix_axis)
# Broadcast weights to (*batched dimensions, stack dimension), ignoring support dimensions
weights_broadcast_shape = stacked_components.shape[: ndim_batch + 1]
weights_broadcasted = pt.broadcast_to(weights, weights_broadcast_shape)
# Draw mixture indexes and append (stack + ndim_supp) broadcastable dimensions to the right
mix_indexes_rng_next, mix_indexes = pt.random.categorical(
weights_broadcasted, rng=mix_indexes_rng
).owner.outputs
mix_indexes_padded = pt.shape_padright(mix_indexes, ndim_supp + 1)
# Index components and squeeze mixture dimension
mix_out = pt.take_along_axis(stacked_components, mix_indexes_padded, axis=mix_axis)
mix_out = pt.squeeze(mix_out, axis=mix_axis)
s = ",".join(f"s{i}" for i in range(components[0].owner.op.ndim_supp))
if len(components) == 1:
comp_s = ",".join((*s, "w"))
extended_signature = f"[rng],(w),({comp_s})->[rng],({s})"
else:
comps_s = ",".join(f"({s})" for _ in components)
extended_signature = f"[rng],(w),{comps_s}->[rng],({s})"
return MarginalMixtureRV(
inputs=[mix_indexes_rng, weights, *components],
outputs=[mix_indexes_rng_next, mix_out],
extended_signature=extended_signature,
)(mix_indexes_rng, weights, *components)
@classmethod
def _resize_components(cls, size, *components):
if len(components) == 1:
# If we have a single component, we need to keep the length of the mixture
# axis intact, because that's what determines the number of mixture components
mix_axis = -components[0].owner.op.ndim_supp - 1
mix_size = components[0].shape[mix_axis]
size = (*size, mix_size)
return [change_dist_size(component, size) for component in components]
def update(self, node: Apply):
# Update for the internal mix_indexes RV
return {node.inputs[0]: node.outputs[0]}
[docs]
class Mixture(Distribution):
R"""
Mixture log-likelihood.
Often used to model subpopulation heterogeneity
.. math:: f(x \mid w, \theta) = \sum_{i = 1}^n w_i f_i(x \mid \theta_i)
======== ============================================
Support :math:`\cup_{i = 1}^n \textrm{support}(f_i)`
Mean :math:`\sum_{i = 1}^n w_i \mu_i`
======== ============================================
Parameters
----------
w : tensor_like of float
w >= 0 and w <= 1
the mixture weights
comp_dists : iterable of unnamed distributions or single batched distribution
Distributions should be created via the `.dist()` API. If a single distribution
is passed, the last size dimension (not shape) determines the number of mixture
components (e.g. `pm.Poisson.dist(..., size=components)`)
:math:`f_1, \ldots, f_n`
.. warning:: comp_dists will be cloned, rendering them independent of the ones passed as input.
Examples
--------
.. code-block:: python
# Mixture of 2 Poisson variables
with pm.Model() as model:
w = pm.Dirichlet("w", a=np.array([1, 1])) # 2 mixture weights
lam1 = pm.Exponential("lam1", lam=1)
lam2 = pm.Exponential("lam2", lam=1)
# As we just need the logp, rather than add a RV to the model, we need to call `.dist()`
# These two forms are equivalent, but the second benefits from vectorization
components = [
pm.Poisson.dist(mu=lam1),
pm.Poisson.dist(mu=lam2),
]
# `shape=(2,)` indicates 2 mixture components
components = pm.Poisson.dist(mu=pm.math.stack([lam1, lam2]), shape=(2,))
like = pm.Mixture("like", w=w, comp_dists=components, observed=data)
.. code-block:: python
# Mixture of Normal and StudentT variables
with pm.Model() as model:
w = pm.Dirichlet("w", a=np.array([1, 1])) # 2 mixture weights
mu = pm.Normal("mu", 0, 1)
components = [
pm.Normal.dist(mu=mu, sigma=1),
pm.StudentT.dist(nu=4, mu=mu, sigma=1),
]
like = pm.Mixture("like", w=w, comp_dists=components, observed=data)
.. code-block:: python
# Mixture of (5 x 3) Normal variables
with pm.Model() as model:
# w is a stack of 5 independent size 3 weight vectors
# If shape was `(3,)`, the weights would be shared across the 5 replication dimensions
w = pm.Dirichlet("w", a=np.ones(3), shape=(5, 3))
# Each of the 3 mixture components has an independent mean
mu = pm.Normal("mu", mu=np.arange(3), sigma=1, shape=3)
# These two forms are equivalent, but the second benefits from vectorization
components = [
pm.Normal.dist(mu=mu[0], sigma=1, shape=(5,)),
pm.Normal.dist(mu=mu[1], sigma=1, shape=(5,)),
pm.Normal.dist(mu=mu[2], sigma=1, shape=(5,)),
]
components = pm.Normal.dist(mu=mu, sigma=1, shape=(5, 3))
# The mixture is an array of 5 elements
# Each element can be thought of as an independent scalar mixture of 3
# components with different means
like = pm.Mixture("like", w=w, comp_dists=components, observed=data)
.. code-block:: python
# Mixture of 2 Dirichlet variables
with pm.Model() as model:
w = pm.Dirichlet("w", a=np.ones(2)) # 2 mixture weights
# These two forms are equivalent, but the second benefits from vectorization
components = [
pm.Dirichlet.dist(a=[1, 10, 100], shape=(3,)),
pm.Dirichlet.dist(a=[100, 10, 1], shape=(3,)),
]
components = pm.Dirichlet.dist(a=[[1, 10, 100], [100, 10, 1]], shape=(2, 3))
# The mixture is an array of 3 elements
# Each element comes from only one of the two core Dirichlet components
like = pm.Mixture("like", w=w, comp_dists=components, observed=data)
"""
rv_type = MarginalMixtureRV
rv_op = MarginalMixtureRV.rv_op
[docs]
@classmethod
def dist(cls, w, comp_dists, **kwargs):
if not isinstance(comp_dists, tuple | list):
# comp_dists is a single component
comp_dists = [comp_dists]
elif len(comp_dists) == 1:
warnings.warn(
"Single component will be treated as a mixture across the last size dimension.\n"
"To disable this warning do not wrap the single component inside a list or tuple",
UserWarning,
)
if len(comp_dists) > 1:
if not (
all(comp_dist.dtype in continuous_types for comp_dist in comp_dists)
or all(comp_dist.dtype in discrete_types for comp_dist in comp_dists)
):
raise ValueError(
"All distributions in comp_dists must be either discrete or continuous.\n"
"See the following issue for more information: https://github.com/pymc-devs/pymc/issues/4511."
)
# Check that components are not associated with a registered variable in the model
components_ndim_supp = set()
for dist in comp_dists:
# TODO: Allow these to not be a RandomVariable as long as we can call `ndim_supp` on them
# and resize them
if not isinstance(dist, TensorVariable) or not isinstance(
dist.owner.op, RandomVariable | SymbolicRandomVariable
):
raise ValueError(
f"Component dist must be a distribution created via the `.dist()` API, got {type(dist)}"
)
check_dist_not_registered(dist)
components_ndim_supp.add(dist.owner.op.ndim_supp)
if len(components_ndim_supp) > 1:
raise ValueError(
f"Mixture components must all have the same support dimensionality, got {components_ndim_supp}"
)
w = pt.as_tensor_variable(w)
return super().dist([w, *comp_dists], **kwargs)
@_change_dist_size.register(MarginalMixtureRV)
def change_marginal_mixture_size(op, dist, new_size, expand=False):
rng, weights, *components = dist.owner.inputs
if expand:
component = components[0]
# Old size is equal to `shape[:-ndim_supp]`, with care needed for `ndim_supp == 0`
size_dims = component.ndim - component.owner.op.ndim_supp
if len(components) == 1:
# If we have a single component, new size should ignore the mixture axis
# dimension, as that is not touched by `_resize_components`
size_dims -= 1
old_size = components[0].shape[:size_dims]
new_size = tuple(new_size) + tuple(old_size)
return Mixture.rv_op(weights, *components, size=new_size)
@_logprob.register(MarginalMixtureRV)
def marginal_mixture_logprob(op, values, rng, weights, *components, **kwargs):
(value,) = values
# single component
if len(components) == 1:
# Need to broadcast value across mixture axis
mix_axis = -components[0].owner.op.ndim_supp - 1
components_logp = logp(components[0], pt.expand_dims(value, mix_axis))
else:
components_logp = pt.stack(
[logp(component, value) for component in components],
axis=-1,
)
mix_logp = pt.logsumexp(pt.log(weights) + components_logp, axis=-1)
mix_logp = check_parameters(
mix_logp,
0 <= weights,
weights <= 1,
pt.isclose(pt.sum(weights, axis=-1), 1),
msg="0 <= weights <= 1, sum(weights) == 1",
)
return mix_logp
@_logcdf.register(MarginalMixtureRV)
def marginal_mixture_logcdf(op, value, rng, weights, *components, **kwargs):
# single component
if len(components) == 1:
# Need to broadcast value across mixture axis
mix_axis = -components[0].owner.op.ndim_supp - 1
components_logcdf = _logcdf_helper(components[0], pt.expand_dims(value, mix_axis))
else:
components_logcdf = pt.stack(
[_logcdf_helper(component, value) for component in components],
axis=-1,
)
mix_logcdf = pt.logsumexp(pt.log(weights) + components_logcdf, axis=-1)
mix_logcdf = check_parameters(
mix_logcdf,
0 <= weights,
weights <= 1,
pt.isclose(pt.sum(weights, axis=-1), 1),
msg="0 <= weights <= 1, sum(weights) == 1",
)
return mix_logcdf
@_support_point.register(MarginalMixtureRV)
def marginal_mixture_support_point(op, rv, rng, weights, *components):
ndim_supp = components[0].owner.op.ndim_supp
weights = pt.shape_padright(weights, ndim_supp)
mix_axis = -ndim_supp - 1
if len(components) == 1:
support_point_components = support_point(components[0])
else:
support_point_components = pt.stack(
[support_point(component) for component in components],
axis=mix_axis,
)
mix_support_point = pt.sum(weights * support_point_components, axis=mix_axis)
if components[0].dtype in discrete_types:
mix_support_point = pt.round(mix_support_point)
return mix_support_point
# List of transforms that can be used by Mixture, either because they do not require
# special handling or because we have custom logic to enable them. If new default
# transforms are implemented, this list and function should be updated
allowed_default_mixture_transforms = (
transforms.CholeskyCovPacked,
transforms.CircularTransform,
transforms.IntervalTransform,
transforms.LogTransform,
transforms.LogExpM1,
transforms.LogOddsTransform,
transforms.Ordered,
transforms.SimplexTransform,
transforms.SumTo1,
)
class MixtureTransformWarning(UserWarning):
pass
@_default_transform.register(MarginalMixtureRV)
def marginal_mixture_default_transform(op, rv):
def transform_warning():
warnings.warn(
f"No safe default transform found for Mixture distribution {rv}. This can "
"happen when components have different supports or default transforms.\n"
"If appropriate, you can specify a custom transform for more efficient sampling.",
MixtureTransformWarning,
stacklevel=2,
)
rng, weights, *components = rv.owner.inputs
default_transforms = [
_default_transform(component.owner.op, component) for component in components
]
# If there are more than one type of default transforms, we do not apply any
if len({type(transform) for transform in default_transforms}) != 1:
transform_warning()
return None
default_transform = default_transforms[0]
if default_transform is None:
return None
if not isinstance(default_transform, allowed_default_mixture_transforms):
transform_warning()
return None
if isinstance(default_transform, IntervalTransform):
# If there are more than one component, we need to check the IntervalTransform
# of the components are actually equivalent (e.g., we don't have an
# Interval(0, 1), and an Interval(0, 2)).
if len(default_transforms) > 1:
value = rv.type()
backward_expressions = [
transform.backward(value, *component.owner.inputs)
for transform, component in zip(default_transforms, components)
]
for expr1, expr2 in itertools.pairwise(backward_expressions):
if not equal_computations([expr1], [expr2]):
transform_warning()
return None
# We need to create a new IntervalTransform that expects the Mixture inputs
args_fn = default_transform.args_fn
def mixture_args_fn(rng, weights, *components):
# We checked that the interval transforms of each component are equivalent,
# so we can just pass the inputs of the first component
return args_fn(*components[0].owner.inputs)
return IntervalTransform(args_fn=mixture_args_fn)
else:
return default_transform
[docs]
class NormalMixture:
R"""
Normal mixture log-likelihood.
.. math::
f(x \mid w, \mu, \sigma^2) = \sum_{i = 1}^n w_i N(x \mid \mu_i, \sigma^2_i)
======== =======================================
Support :math:`x \in \mathbb{R}`
Mean :math:`\sum_{i = 1}^n w_i \mu_i`
Variance :math:`\sum_{i = 1}^n w_i (\sigma^2_i + \mu_i^2) - \left(\sum_{i = 1}^n w_i \mu_i\right)^2`
======== =======================================
Parameters
----------
w : tensor_like of float
w >= 0 and w <= 1
the mixture weights
mu : tensor_like of float
the component means
sigma : tensor_like of float
the component standard deviations
tau : tensor_like of float
the component precisions
Notes
-----
You only have to pass in sigma or tau, but not both.
Examples
--------
.. code-block:: python
n_components = 3
with pm.Model() as gauss_mix:
μ = pm.Normal(
"μ",
mu=data.mean(),
sigma=10,
shape=n_components,
transform=pm.distributions.transforms.ordered,
initval=[1, 2, 3],
)
σ = pm.HalfNormal("σ", sigma=10, shape=n_components)
weights = pm.Dirichlet("w", np.ones(n_components))
y = pm.NormalMixture("y", w=weights, mu=μ, sigma=σ, observed=data)
"""
def __new__(cls, name, w, mu, sigma=None, tau=None, **kwargs):
_, sigma = get_tau_sigma(tau=tau, sigma=sigma)
return Mixture(name, w, Normal.dist(mu, sigma=sigma), **kwargs)
[docs]
@classmethod
def dist(cls, w, mu, sigma=None, tau=None, **kwargs):
_, sigma = get_tau_sigma(tau=tau, sigma=sigma)
return Mixture.dist(w, Normal.dist(mu, sigma=sigma), **kwargs)
def _zero_inflated_mixture(*, name, nonzero_p, nonzero_dist, **kwargs):
"""Create a zero-inflated mixture (helper function).
If name is `None`, this function returns an unregistered variable.
"""
nonzero_p = pt.as_tensor_variable(nonzero_p)
weights = pt.stack([1 - nonzero_p, nonzero_p], axis=-1)
comp_dists = [
DiracDelta.dist(0),
nonzero_dist,
]
if name is not None:
return Mixture(name, weights, comp_dists, **kwargs)
else:
return Mixture.dist(weights, comp_dists, **kwargs)
[docs]
class ZeroInflatedPoisson:
R"""
Zero-inflated Poisson log-likelihood.
Often used to model the number of events occurring in a fixed period
of time when the times at which events occur are independent.
The pmf of this distribution is
.. math::
f(x \mid \psi, \mu) = \left\{ \begin{array}{l}
(1-\psi) + \psi e^{-\mu}, \text{if } x = 0 \\
\psi \frac{e^{-\mu}\mu^x}{x!}, \text{if } x=1,2,3,\ldots
\end{array} \right.
.. plot::
:context: close-figs
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as st
import arviz as az
plt.style.use('arviz-darkgrid')
x = np.arange(0, 22)
psis = [0.7, 0.4]
mus = [8, 4]
for psi, mu in zip(psis, mus):
pmf = st.poisson.pmf(x, mu)
pmf[0] = (1 - psi) + pmf[0]
pmf[1:] = psi * pmf[1:]
pmf /= pmf.sum()
plt.plot(x, pmf, '-o', label='$\\psi$ = {}, $\\mu$ = {}'.format(psi, mu))
plt.xlabel('x', fontsize=12)
plt.ylabel('f(x)', fontsize=12)
plt.legend(loc=1)
plt.show()
======== ==========================
Support :math:`x \in \mathbb{N}_0`
Mean :math:`\psi\mu`
Variance :math:`\mu + \frac{1-\psi}{\psi}\mu^2`
======== ==========================
Parameters
----------
psi : tensor_like of float
Expected proportion of Poisson draws (0 < psi < 1)
mu : tensor_like of float
Expected number of occurrences during the given interval
(mu >= 0).
"""
def __new__(cls, name, psi, mu, **kwargs):
return _zero_inflated_mixture(
name=name, nonzero_p=psi, nonzero_dist=Poisson.dist(mu=mu), **kwargs
)
[docs]
@classmethod
def dist(cls, psi, mu, **kwargs):
return _zero_inflated_mixture(
name=None, nonzero_p=psi, nonzero_dist=Poisson.dist(mu=mu), **kwargs
)
[docs]
class ZeroInflatedBinomial:
R"""
Zero-inflated Binomial log-likelihood.
The pmf of this distribution is
.. math::
f(x \mid \psi, n, p) = \left\{ \begin{array}{l}
(1-\psi) + \psi (1-p)^{n}, \text{if } x = 0 \\
\psi {n \choose x} p^x (1-p)^{n-x}, \text{if } x=1,2,3,\ldots,n
\end{array} \right.
.. plot::
:context: close-figs
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as st
import arviz as az
plt.style.use('arviz-darkgrid')
x = np.arange(0, 25)
ns = [10, 20]
ps = [0.5, 0.7]
psis = [0.7, 0.4]
for n, p, psi in zip(ns, ps, psis):
pmf = st.binom.pmf(x, n, p)
pmf[0] = (1 - psi) + pmf[0]
pmf[1:] = psi * pmf[1:]
pmf /= pmf.sum()
plt.plot(x, pmf, '-o', label='n = {}, p = {}, $\\psi$ = {}'.format(n, p, psi))
plt.xlabel('x', fontsize=12)
plt.ylabel('f(x)', fontsize=12)
plt.legend(loc=1)
plt.show()
======== ==========================
Support :math:`x \in \mathbb{N}_0`
Mean :math:`\psi n p`
Variance :math:`(1-\psi) n p [1 - p(1 - \psi n)].`
======== ==========================
Parameters
----------
psi : tensor_like of float
Expected proportion of Binomial draws (0 < psi < 1)
n : tensor_like of int
Number of Bernoulli trials (n >= 0).
p : tensor_like of float
Probability of success in each trial (0 < p < 1).
"""
def __new__(cls, name, psi, n, p, **kwargs):
return _zero_inflated_mixture(
name=name, nonzero_p=psi, nonzero_dist=Binomial.dist(n=n, p=p), **kwargs
)
[docs]
@classmethod
def dist(cls, psi, n, p, **kwargs):
return _zero_inflated_mixture(
name=None, nonzero_p=psi, nonzero_dist=Binomial.dist(n=n, p=p), **kwargs
)
[docs]
class ZeroInflatedNegativeBinomial:
R"""
Zero-Inflated Negative binomial log-likelihood.
The Zero-inflated version of the Negative Binomial (NB).
The NB distribution describes a Poisson random variable
whose rate parameter is gamma distributed.
The pmf of this distribution is.
.. math::
f(x \mid \psi, \mu, \alpha) = \left\{
\begin{array}{l}
(1-\psi) + \psi \left (
\frac{\alpha}{\alpha+\mu}
\right) ^\alpha, \text{if } x = 0 \\
\psi \frac{\Gamma(x+\alpha)}{x! \Gamma(\alpha)} \left (
\frac{\alpha}{\mu+\alpha}
\right)^\alpha \left(
\frac{\mu}{\mu+\alpha}
\right)^x, \text{if } x=1,2,3,\ldots
\end{array}
\right.
.. plot::
:context: close-figs
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as st
from scipy import special
import arviz as az
plt.style.use('arviz-darkgrid')
def ZeroInfNegBinom(a, m, psi, x):
pmf = special.binom(x + a - 1, x) * (a / (m + a))**a * (m / (m + a))**x
pmf[0] = (1 - psi) + pmf[0]
pmf[1:] = psi * pmf[1:]
pmf /= pmf.sum()
return pmf
x = np.arange(0, 25)
alphas = [2, 4]
mus = [2, 8]
psis = [0.7, 0.7]
for a, m, psi in zip(alphas, mus, psis):
pmf = ZeroInfNegBinom(a, m, psi, x)
plt.plot(x, pmf, '-o', label=r'$\alpha$ = {}, $\mu$ = {}, $\psi$ = {}'.format(a, m, psi))
plt.xlabel('x', fontsize=12)
plt.ylabel('f(x)', fontsize=12)
plt.legend(loc=1)
plt.show()
======== ==========================
Support :math:`x \in \mathbb{N}_0`
Mean :math:`\psi\mu`
Var .. math::
\psi \left(\frac{{\mu^2}}{{\alpha}}\right) +\
\psi \mu + \psi \mu^2 - \psi^2 \mu^2
======== ==========================
The zero inflated negative binomial distribution can be parametrized
either in terms of mu or p, and either in terms of alpha or n.
The link between the parametrizations is given by
.. math::
\mu &= \frac{n(1-p)}{p} \\
\alpha &= n
Parameters
----------
psi : tensor_like of float
Expected proportion of NegativeBinomial draws (0 < psi < 1)
mu : tensor_like of float
Poisson distribution parameter (mu > 0).
alpha : tensor_like of float
Gamma distribution parameter (alpha > 0).
p : tensor_like of float
Alternative probability of success in each trial (0 < p < 1).
n : tensor_like of float
Alternative number of target success trials (n > 0)
"""
def __new__(cls, name, psi, mu=None, alpha=None, p=None, n=None, **kwargs):
return _zero_inflated_mixture(
name=name,
nonzero_p=psi,
nonzero_dist=NegativeBinomial.dist(mu=mu, alpha=alpha, p=p, n=n),
**kwargs,
)
[docs]
@classmethod
def dist(cls, psi, mu=None, alpha=None, p=None, n=None, **kwargs):
return _zero_inflated_mixture(
name=None,
nonzero_p=psi,
nonzero_dist=NegativeBinomial.dist(mu=mu, alpha=alpha, p=p, n=n),
**kwargs,
)
def _hurdle_mixture(*, name, nonzero_p, nonzero_dist, dtype, max_n_steps=10_000, **kwargs):
"""Create a hurdle mixtures (helper function).
If name is `None`, this function returns an unregistered variable
In hurdle models, the zeros come from a completely different process than the rest of the data.
In other words, the zeros are not inflated, they come from a different process.
"""
if dtype == "float":
zero = 0.0
lower = np.finfo(pytensor.config.floatX).eps
elif dtype == "int":
zero = 0
lower = 1
else:
raise ValueError("dtype must be 'float' or 'int'")
nonzero_p = pt.as_tensor_variable(nonzero_p)
weights = pt.stack([1 - nonzero_p, nonzero_p], axis=-1)
comp_dists = [
DiracDelta.dist(zero),
Truncated.dist(nonzero_dist, lower=lower, max_n_steps=max_n_steps),
]
if name is not None:
return Mixture(name, weights, comp_dists, **kwargs)
else:
return Mixture.dist(weights, comp_dists, **kwargs)
[docs]
class HurdlePoisson:
R"""
Hurdle Poisson log-likelihood.
The Poisson distribution is often used to model the number of events occurring
in a fixed period of time or space when the times or locations
at which events occur are independent.
The difference with ZeroInflatedPoisson is that the zeros are not inflated,
they come from a completely independent process.
The pmf of this distribution is
.. math::
f(x \mid \psi, \mu) =
\left\{
\begin{array}{l}
(1 - \psi) \ \text{if } x = 0 \\
\psi
\frac{\text{PoissonPDF}(x \mid \mu))}
{1 - \text{PoissonCDF}(0 \mid \mu)} \ \text{if } x=1,2,3,\ldots
\end{array}
\right.
Parameters
----------
psi : tensor_like of float
Expected proportion of Poisson draws (0 < psi < 1)
mu : tensor_like of float
Expected number of occurrences (mu >= 0).
"""
def __new__(cls, name, psi, mu, **kwargs):
return _hurdle_mixture(
name=name, nonzero_p=psi, nonzero_dist=Poisson.dist(mu=mu), dtype="int", **kwargs
)
[docs]
@classmethod
def dist(cls, psi, mu, **kwargs):
return _hurdle_mixture(
name=None, nonzero_p=psi, nonzero_dist=Poisson.dist(mu=mu), dtype="int", **kwargs
)
[docs]
class HurdleNegativeBinomial:
R"""
Hurdle Negative Binomial log-likelihood.
The negative binomial distribution describes a Poisson random variable
whose rate parameter is gamma distributed.
The difference with ZeroInflatedNegativeBinomial is that the zeros are not inflated,
they come from a completely independent process.
The pmf of this distribution is
.. math::
f(x \mid \psi, \mu, \alpha) =
\left\{
\begin{array}{l}
(1 - \psi) \ \text{if } x = 0 \\
\psi
\frac{\text{NegativeBinomialPDF}(x \mid \mu, \alpha))}
{1 - \text{NegativeBinomialCDF}(0 \mid \mu, \alpha)} \ \text{if } x=1,2,3,\ldots
\end{array}
\right.
Parameters
----------
psi : tensor_like of float
Expected proportion of Negative Binomial draws (0 < psi < 1)
alpha : tensor_like of float
Gamma distribution shape parameter (alpha > 0).
mu : tensor_like of float
Gamma distribution mean (mu > 0).
p : tensor_like of float
Alternative probability of success in each trial (0 < p < 1).
n : tensor_like of float
Alternative number of target success trials (n > 0)
"""
def __new__(cls, name, psi, mu=None, alpha=None, p=None, n=None, **kwargs):
return _hurdle_mixture(
name=name,
nonzero_p=psi,
nonzero_dist=NegativeBinomial.dist(mu=mu, alpha=alpha, p=p, n=n),
dtype="int",
**kwargs,
)
[docs]
@classmethod
def dist(cls, psi, mu=None, alpha=None, p=None, n=None, **kwargs):
return _hurdle_mixture(
name=None,
nonzero_p=psi,
nonzero_dist=NegativeBinomial.dist(mu=mu, alpha=alpha, p=p, n=n),
dtype="int",
**kwargs,
)
[docs]
class HurdleGamma:
R"""
Hurdle Gamma log-likelihood.
.. math::
f(x \mid \psi, \alpha, \beta) =
\left\{
\begin{array}{l}
(1 - \psi) \ \text{if } x = 0 \\
\psi
\frac{\text{GammaPDF}(x \mid \alpha, \beta))}
{1 - \text{GammaCDF}(\epsilon \mid \alpha, \beta)} \ \text{if } x=1,2,3,\ldots
\end{array}
\right.
where :math:`\epsilon` is the machine precision.
Parameters
----------
psi : tensor_like of float
Expected proportion of Gamma draws (0 < psi < 1)
alpha : tensor_like of float, optional
Shape parameter (alpha > 0).
beta : tensor_like of float, optional
Rate parameter (beta > 0).
mu : tensor_like of float, optional
Alternative shape parameter (mu > 0).
sigma : tensor_like of float, optional
Alternative scale parameter (sigma > 0).
"""
def __new__(cls, name, psi, alpha=None, beta=None, mu=None, sigma=None, **kwargs):
return _hurdle_mixture(
name=name,
nonzero_p=psi,
nonzero_dist=Gamma.dist(alpha=alpha, beta=beta, mu=mu, sigma=sigma),
dtype="float",
**kwargs,
)
[docs]
@classmethod
def dist(cls, psi, alpha=None, beta=None, mu=None, sigma=None, **kwargs):
return _hurdle_mixture(
name=None,
nonzero_p=psi,
nonzero_dist=Gamma.dist(alpha=alpha, beta=beta, mu=mu, sigma=sigma),
dtype="float",
**kwargs,
)
[docs]
class HurdleLogNormal:
R"""
Hurdle LogNormal log-likelihood.
.. math::
f(x \mid \psi, \mu, \sigma) =
\left\{
\begin{array}{l}
(1 - \psi) \ \text{if } x = 0 \\
\psi
\frac{\text{LogNormalPDF}(x \mid \mu, \sigma))}
{1 - \text{LogNormalCDF}(\epsilon \mid \mu, \sigma)} \ \text{if } x=1,2,3,\ldots
\end{array}
\right.
where :math:`\epsilon` is the machine precision.
Parameters
----------
psi : tensor_like of float
Expected proportion of LogNormal draws (0 < psi < 1)
mu : tensor_like of float, default 0
Location parameter.
sigma : tensor_like of float, optional
Standard deviation. (sigma > 0). (only required if tau is not specified).
Defaults to 1.
tau : tensor_like of float, optional
Scale parameter (tau > 0). (only required if sigma is not specified).
Defaults to 1.
"""
def __new__(cls, name, psi, mu=0, sigma=None, tau=None, **kwargs):
return _hurdle_mixture(
name=name,
nonzero_p=psi,
nonzero_dist=LogNormal.dist(mu=mu, sigma=sigma, tau=tau),
dtype="float",
**kwargs,
)
[docs]
@classmethod
def dist(cls, psi, mu=0, sigma=None, tau=None, **kwargs):
return _hurdle_mixture(
name=None,
nonzero_p=psi,
nonzero_dist=LogNormal.dist(mu=mu, sigma=sigma, tau=tau),
dtype="float",
**kwargs,
)