Source code for pymc.distributions.censored

#   Copyright 2024 The PyMC Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
import numpy as np
import pytensor.tensor as pt

from pytensor.tensor import TensorVariable
from pytensor.tensor.random.op import RandomVariable
from pytensor.tensor.random.utils import normalize_size_param

from pymc.distributions.distribution import (
    Distribution,
    SymbolicRandomVariable,
    _support_point,
)
from pymc.distributions.shape_utils import (
    _change_dist_size,
    change_dist_size,
    implicit_size_from_params,
    rv_size_is_none,
)
from pymc.util import check_dist_not_registered


class CensoredRV(SymbolicRandomVariable):
    """Censored random variable."""

    inline_logprob = True
    extended_signature = "(),(),()->()"
    _print_name = ("Censored", "\\operatorname{Censored}")

    @classmethod
    def rv_op(cls, dist, lower, upper, *, size=None):
        # We don't allow passing `rng` because we don't fully control the rng of the components!
        lower = pt.constant(-np.inf) if lower is None else pt.as_tensor(lower)
        upper = pt.constant(np.inf) if upper is None else pt.as_tensor(upper)
        size = normalize_size_param(size)

        if rv_size_is_none(size):
            size = implicit_size_from_params(dist, lower, upper, ndims_params=cls.ndims_params)

        # Censoring is achieved by clipping the base distribution between lower and upper
        dist = change_dist_size(dist, size)
        censored_rv = pt.clip(dist, lower, upper)

        return CensoredRV(
            inputs=[dist, lower, upper],
            outputs=[censored_rv],
        )(dist, lower, upper)


[docs] class Censored(Distribution): r""" Censored distribution. The pdf of a censored distribution is .. math:: \begin{cases} 0 & \text{for } x < lower, \\ \text{CDF}(lower, dist) & \text{for } x = lower, \\ \text{PDF}(x, dist) & \text{for } lower < x < upper, \\ 1-\text{CDF}(upper, dist) & \text {for} x = upper, \\ 0 & \text{for } x > upper, \end{cases} Parameters ---------- dist : unnamed_distribution Univariate distribution which will be censored. This distribution must have a logcdf method implemented for sampling. .. warning:: dist will be cloned, rendering it independent of the one passed as input. lower : float or None Lower (left) censoring point. If `None` the distribution will not be left censored upper : float or None Upper (right) censoring point. If `None`, the distribution will not be right censored. Warnings -------- Continuous censored distributions should only be used as likelihoods. Continuous censored distributions are a form of discrete-continuous mixture and as such cannot be sampled properly without a custom step sampler. If you wish to sample such a distribution, you can add the latent uncensored distribution to the model and then wrap it in a :class:`~pymc.Deterministic` :func:`~pymc.math.clip`. Examples -------- .. code-block:: python with pm.Model(): normal_dist = pm.Normal.dist(mu=0.0, sigma=1.0) censored_normal = pm.Censored("censored_normal", normal_dist, lower=-1, upper=1) """ rv_type = CensoredRV rv_op = CensoredRV.rv_op
[docs] @classmethod def dist(cls, dist, lower=-np.inf, upper=np.inf, **kwargs): if not isinstance(dist, TensorVariable) or not isinstance( dist.owner.op, RandomVariable | SymbolicRandomVariable ): raise ValueError( f"Censoring dist must be a distribution created via the `.dist()` API, got {type(dist)}" ) if dist.owner.op.ndim_supp > 0: raise NotImplementedError( "Censoring of multivariate distributions has not been implemented yet" ) check_dist_not_registered(dist) return super().dist([dist, lower, upper], **kwargs)
@_change_dist_size.register(CensoredRV) def change_censored_size(cls, dist, new_size, expand=False): uncensored_dist, lower, upper = dist.owner.inputs if expand: new_size = tuple(new_size) + tuple(uncensored_dist.shape) return Censored.rv_op(uncensored_dist, lower, upper, size=new_size) @_support_point.register(CensoredRV) def support_point_censored(op, rv, dist, lower, upper): support_point = pt.switch( pt.eq(lower, -np.inf), pt.switch( pt.isinf(upper), # lower = -inf, upper = inf 0, # lower = -inf, upper = x upper - 1, ), pt.switch( pt.eq(upper, np.inf), # lower = x, upper = inf lower + 1, # lower = x, upper = x (lower + upper) / 2, ), ) support_point = pt.full_like(dist, support_point) return support_point