# Copyright 2023 The PyMC Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pymc as pm
from pymc.distributions.dist_math import betaln, check_parameters, factln, logpow
from pymc.distributions.shape_utils import rv_size_is_none
from pytensor import tensor as pt
from pytensor.tensor.random.op import RandomVariable
def log1mexp(x):
cond = x < np.log(0.5)
return np.piecewise(
x,
[cond, ~cond],
[lambda x: np.log1p(-np.exp(x)), lambda x: np.log(-np.expm1(x))],
)
class GeneralizedPoissonRV(RandomVariable):
name = "generalized_poisson"
ndim_supp = 0
ndims_params = [0, 0]
dtype = "int64"
_print_name = ("GeneralizedPoisson", "\\operatorname{GeneralizedPoisson}")
@classmethod
def rng_fn(cls, rng, theta, lam, size):
theta = np.asarray(theta)
lam = np.asarray(lam)
if size is not None:
dist_size = size
else:
dist_size = np.broadcast_shapes(theta.shape, lam.shape)
# A mix of 2 algorithms described by Famoye (1997) is used depending on parameter values
# 0: Inverse method, computed on the log scale. Used when lam <= 0.
# 1: Branching method. Used when lambda > 0.
x = np.empty(dist_size)
idxs_mask = np.broadcast_to(lam < 0, dist_size)
if np.any(idxs_mask):
x[idxs_mask] = cls._inverse_rng_fn(rng, theta, lam, dist_size, idxs_mask=idxs_mask)[
idxs_mask
]
idxs_mask = ~idxs_mask
if np.any(idxs_mask):
x[idxs_mask] = cls._branching_rng_fn(rng, theta, lam, dist_size, idxs_mask=idxs_mask)[
idxs_mask
]
return x
@classmethod
def _inverse_rng_fn(cls, rng, theta, lam, dist_size, idxs_mask):
# We handle x/0 and log(0) issues with branching
with np.errstate(divide="ignore", invalid="ignore"):
log_u = np.log(rng.uniform(size=dist_size))
pos_lam = lam > 0
abs_log_lam = np.log(np.abs(lam))
theta_m_lam = theta - lam
log_s = -theta
log_p = log_s.copy()
x_ = 0
x = np.zeros(dist_size)
below_cutpoint = log_s < log_u
while np.any(below_cutpoint[idxs_mask]):
x_ += 1
x[below_cutpoint] += 1
log_c = np.log(theta_m_lam + lam * x_)
# Compute log(1 + lam / C)
log1p_lam_m_C = np.where(
pos_lam,
np.log1p(np.exp(abs_log_lam - log_c)),
log1mexp(abs_log_lam - log_c),
)
log_p = log_c + log1p_lam_m_C * (x_ - 1) + log_p - np.log(x_) - lam
log_s = np.logaddexp(log_s, log_p)
below_cutpoint = log_s < log_u
return x
@classmethod
def _branching_rng_fn(cls, rng, theta, lam, dist_size, idxs_mask):
lam_ = np.abs(lam) # This algorithm is only valid for positive lam
y = rng.poisson(theta, size=dist_size)
x = y.copy()
higher_than_zero = y > 0
while np.any(higher_than_zero[idxs_mask]):
y = rng.poisson(lam_ * y)
x[higher_than_zero] = x[higher_than_zero] + y[higher_than_zero]
higher_than_zero = y > 0
return x
generalized_poisson = GeneralizedPoissonRV()
[docs]
class GeneralizedPoisson(pm.distributions.Discrete):
R"""
Generalized Poisson.
Used to model count data that can be either overdispersed or underdispersed.
Offers greater flexibility than the standard Poisson which assumes equidispersion,
where the mean is equal to the variance.
The pmf of this distribution is
.. math:: f(x \mid \mu, \lambda) =
\frac{\mu (\mu + \lambda x)^{x-1} e^{-\mu - \lambda x}}{x!}
======== ======================================
Support :math:`x \in \mathbb{N}_0`
Mean :math:`\frac{\mu}{1 - \lambda}`
Variance :math:`\frac{\mu}{(1 - \lambda)^3}`
======== ======================================
Parameters
----------
mu : tensor_like of float
Mean parameter (mu > 0).
lam : tensor_like of float
Dispersion parameter (max(-1, -mu/4) <= lam <= 1).
Notes
-----
When lam = 0, the Generalized Poisson reduces to the standard Poisson with the same mu.
When lam < 0, the mean is greater than the variance (underdispersion).
When lam > 0, the mean is less than the variance (overdispersion).
References
----------
The PMF is taken from [1] and the random generator function is adapted from [2].
.. [1] Consul, PoC, and Felix Famoye. "Generalized Poisson regression model."
Communications in Statistics-Theory and Methods 21.1 (1992): 89-109.
.. [2] Famoye, Felix. "Generalized Poisson random variate generation." American
Journal of Mathematical and Management Sciences 17.3-4 (1997): 219-237.
"""
rv_op = generalized_poisson
@classmethod
def dist(cls, mu, lam, **kwargs):
mu = pt.as_tensor_variable(mu)
lam = pt.as_tensor_variable(lam)
return super().dist([mu, lam], **kwargs)
def support_point(rv, size, mu, lam):
mean = pt.floor(mu / (1 - lam))
if not rv_size_is_none(size):
mean = pt.full(size, mean)
return mean
def logp(value, mu, lam):
mu_lam_value = mu + lam * value
logprob = np.log(mu) + logpow(mu_lam_value, value - 1) - mu_lam_value - factln(value)
# Probability is 0 when value > m, where m is the largest positive integer for
# which mu + m * lam > 0 (when lam < 0).
logprob = pt.switch(
pt.or_(
mu_lam_value < 0,
value < 0,
),
-np.inf,
logprob,
)
return check_parameters(
logprob,
0 < mu,
pt.abs(lam) <= 1,
(-mu / 4) <= lam,
msg="0 < mu, max(-1, -mu/4)) <= lam <= 1",
)
[docs]
class BetaNegativeBinomial:
R"""
Beta Negative Binomial distribution.
The pmf of this distribution is
.. math::
f(x \mid \alpha, \beta, r) = \frac{B(r + x, \alpha + \beta)}{B(r, \alpha)} \frac{\Gamma(x + \beta)}{x! \Gamma(\beta)}
where :math:`B` is the Beta function and :math:`\Gamma` is the Gamma function.
For more information, see https://en.wikipedia.org/wiki/Beta_negative_binomial_distribution.
.. plot::
:context: close-figs
import matplotlib.pyplot as plt
import numpy as np
from scipy.special import betaln, gammaln
def factln(x):
return gammaln(x + 1)
def logp(x, alpha, beta, r):
return (
betaln(r + x, alpha + beta)
- betaln(r, alpha)
+ gammaln(x + beta)
- factln(x)
- gammaln(beta)
)
plt.style.use('arviz-darkgrid')
x = np.arange(0, 25)
params = [
(1, 1, 1),
(1, 1, 10),
(1, 10, 1),
(1, 10, 10),
(10, 10, 10),
]
for alpha, beta, r in params:
pmf = np.exp(logp(x, alpha, beta, r))
plt.plot(x, pmf, "-o", label=r'$alpha$ = {}, $beta$ = {}, $r$ = {}'.format(alpha, beta, r))
plt.xlabel('x', fontsize=12)
plt.ylabel('f(x)', fontsize=12)
plt.legend(loc=1)
plt.show()
======== ======================================
Support :math:`x \in \mathbb{N}_0`
Mean :math:`{\begin{cases}{\frac {r\beta }{\alpha -1}}&{\text{if}}\ \alpha >1\\\infty &{\text{otherwise}}\ \end{cases}}`
Variance :math:`{\displaystyle {\begin{cases}{\frac {r\beta (r+\alpha -1)(\beta +\alpha -1)}{(\alpha -2){(\alpha -1)}^{2}}}&{\text{if}}\ \alpha >2\\\infty &{\text{otherwise}}\ \end{cases}}}`
======== ======================================
Parameters
----------
alpha : tensor_like of float
shape of the beta distribution (alpha > 0).
beta : tensor_like of float
shape of the beta distribution (beta > 0).
r : tensor_like of float
number of successes until the experiment is stopped (integer but can be extended to real)
"""
@staticmethod
def beta_negative_binomial_dist(alpha, beta, r, size):
if rv_size_is_none(size):
alpha, beta, r = pt.broadcast_arrays(alpha, beta, r)
p = pm.Beta.dist(alpha, beta, size=size)
return pm.NegativeBinomial.dist(p, r, size=size)
@staticmethod
def beta_negative_binomial_logp(value, alpha, beta, r):
res = (
betaln(r + value, alpha + beta)
- betaln(r, alpha)
+ pt.gammaln(value + beta)
- factln(value)
- pt.gammaln(beta)
)
res = pt.switch(
pt.lt(value, 0),
-np.inf,
res,
)
return check_parameters(
res,
alpha > 0,
beta > 0,
r > 0,
msg="alpha > 0, beta > 0, r > 0",
)
def __new__(cls, name, alpha, beta, r, **kwargs):
return pm.CustomDist(
name,
alpha,
beta,
r,
dist=cls.beta_negative_binomial_dist,
logp=cls.beta_negative_binomial_logp,
class_name="BetaNegativeBinomial",
**kwargs,
)
@classmethod
def dist(cls, alpha, beta, r, **kwargs):
return pm.CustomDist.dist(
alpha,
beta,
r,
dist=cls.beta_negative_binomial_dist,
logp=cls.beta_negative_binomial_logp,
class_name="BetaNegativeBinomial",
**kwargs,
)
[docs]
class Skellam:
R"""
Skellam distribution.
The Skellam distribution is the distribution of the difference of two
Poisson random variables.
The pmf of this distribution is
.. math::
f(x | \mu_1, \mu_2) = e^{{-(\mu _{1}\!+\!\mu _{2})}}\left({\frac {\mu _{1}}{\mu _{2}}}\right)^{{x/2}}\!\!I_{{x}}(2{\sqrt {\mu _{1}\mu _{2}}})
where :math:`I_{x}` is the modified Bessel function of the first kind of order :math:`x`.
Read more about the Skellam distribution at https://en.wikipedia.org/wiki/Skellam_distribution
.. plot::
:context: close-figs
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as st
import arviz as az
plt.style.use('arviz-darkgrid')
x = np.arange(-15, 15)
params = [
(1, 1),
(5, 5),
(5, 1),
]
for mu1, mu2 in params:
pmf = st.skellam.pmf(x, mu1, mu2)
plt.plot(x, pmf, "-o", label=r'$\mu_1$ = {}, $\mu_2$ = {}'.format(mu1, mu2))
plt.xlabel('x', fontsize=12)
plt.ylabel('f(x)', fontsize=12)
plt.legend(loc=1)
plt.show()
======== ======================================
Support :math:`x \in \mathbb{Z}`
Mean :math:`\mu_{1} - \mu_{2}`
Variance :math:`\mu_{1} + \mu_{2}`
======== ======================================
Parameters
----------
mu1 : tensor_like of float
Mean parameter (mu1 >= 0).
mu2 : tensor_like of float
Mean parameter (mu2 >= 0).
"""
@staticmethod
def skellam_dist(mu1, mu2, size):
if rv_size_is_none(size):
mu1, mu2 = pt.broadcast_arrays(mu1, mu2)
return pm.Poisson.dist(mu=mu1, size=size) - pm.Poisson.dist(mu=mu2, size=size)
@staticmethod
def skellam_logp(value, mu1, mu2):
res = (
-mu1
- mu2
+ 0.5 * value * (pt.log(mu1) - pt.log(mu2))
+ pt.log(pt.iv(value, 2 * pt.sqrt(mu1 * mu2)))
)
return check_parameters(
res,
mu1 >= 0,
mu2 >= 0,
msg="mu1 >= 0, mu2 >= 0",
)
def __new__(cls, name, mu1, mu2, **kwargs):
return pm.CustomDist(
name,
mu1,
mu2,
dist=cls.skellam_dist,
logp=cls.skellam_logp,
class_name="Skellam",
**kwargs,
)
@classmethod
def dist(cls, mu1, mu2, **kwargs):
return pm.CustomDist.dist(
mu1,
mu2,
dist=cls.skellam_dist,
logp=cls.skellam_logp,
class_name="Skellam",
**kwargs,
)