Source code for pymc.distributions.simulator

#   Copyright 2020 The PyMC Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging

import numpy as np
import pytensor
import pytensor.tensor as at

from pytensor.graph.op import Apply, Op
from pytensor.tensor.random.op import RandomVariable
from pytensor.tensor.var import TensorVariable
from scipy.spatial import cKDTree

from pymc.distributions.distribution import Distribution, _moment
from pymc.logprob.abstract import _logprob
from pymc.pytensorf import floatX

__all__ = ["Simulator"]

_log = logging.getLogger("pymc")


class SimulatorRV(RandomVariable):
    """
    Base class for SimulatorRVs

    This should be subclassed when defining custom Simulator objects.
    """

    name = "SimulatorRV"
    ndim_supp = None
    ndims_params = None
    dtype = "floatX"
    _print_name = ("Simulator", "\\operatorname{Simulator}")

    fn = None
    _distance = None
    _sum_stat = None
    epsilon = None

    @classmethod
    def rng_fn(cls, *args, **kwargs):
        return cls.fn(*args, **kwargs)

    @classmethod
    def distance(cls, *args, **kwargs):
        return cls._distance(*args, **kwargs)

    @classmethod
    def sum_stat(cls, *args, **kwargs):
        return cls._sum_stat(*args, **kwargs)


[docs]class Simulator(Distribution): r""" Simulator distribution, used for Approximate Bayesian Inference (ABC) with Sequential Monte Carlo (SMC) sampling via :func:`~pymc.sample_smc`. Simulator distributions have a stochastic pseudo-loglikelihood defined by a distance metric between the observed and simulated data, and tweaked by a hyper-parameter ``epsilon``. Parameters ---------- fn : callable Python random simulator function. Should expect the following signature ``(rng, arg1, arg2, ... argn, size)``, where rng is a ``numpy.random.Generator`` and ``size`` defines the size of the desired sample. *unnamed_params : list of TensorVariable Parameters used by the Simulator random function. Each parameter can be passed by order after fn, for example ``param1, param2, ..., paramN``. params can also be passed with keyword argument "params". params : list of TensorVariable Keyword form of ''unnamed_params''. One of unnamed_params or params must be provided. If passed both unnamed_params and params, an error is raised. class_name : str Name for the RandomVariable class which will wrap the Simulator methods. When not specified, it will be given the name of the variable. .. warning:: New Simulators created with the same class_name will override the methods dispatched onto the previous classes. If using Simulators with different methods across separate models, be sure to use distinct class_names. distance : PyTensor_Op, callable or str, default "gaussian" Distance function. Available options are ``"gaussian"``, ``"laplace"``, ``"kullback_leibler"`` or a user defined function (or PyTensor_Op) that takes ``epsilon``, the summary statistics of observed_data and the summary statistics of simulated_data as input. ``gaussian``: :math:`-0.5 \left(\left(\frac{xo - xs}{\epsilon}\right)^2\right)` ``laplace``: :math:`-{\left(\frac{|xo - xs|}{\epsilon}\right)}` ``kullback_leibler``: :math:`\frac{d}{n} \frac{1}{\epsilon} \sum_i^n -\log \left( \frac{{\nu_d}_i}{{\rho_d}_i} \right) + \log_r` [1]_ ``distance="gaussian"`` + ``sum_stat="sort"`` is equivalent to the 1D 2-wasserstein distance ``distance="laplace"`` + ``sum_stat="sort"`` is equivalent to the the 1D 1-wasserstein distance sum_stat : PyTensor_Op, callable or str, default "identity" Summary statistic function. Available options are ``"identity"``, ``"sort"``, ``"mean"``, ``"median"``. If a callable (or PyTensor_Op) is defined, it should return a 1d numpy array (or PyTensor vector). epsilon : tensor_like of float, default 1.0 Scaling parameter for the distance functions. It should be a float or an array of the same size of the output of ``sum_stat``. ndim_supp : int, default 0 Number of dimensions of the SimulatorRV (0 for scalar, 1 for vector, etc.) ndims_params : list of int, optional Number of minimum dimensions of each parameter of the RV. For example, if the Simulator accepts two scalar inputs, it should be ``[0, 0]``. Default to list of 0 with length equal to the number of parameters. Examples -------- .. code-block:: python def simulator_fn(rng, loc, scale, size): return rng.normal(loc, scale, size=size) with pm.Model() as m: loc = pm.Normal("loc", 0, 1) scale = pm.HalfNormal("scale", 1) simulator = pm.Simulator("simulator", simulator_fn, loc, scale, observed=data) idata = pm.sample_smc() References ---------- .. [1] PĂ©rez-Cruz, F. (2008, July). Kullback-Leibler divergence estimation of continuous distributions. In 2008 IEEE international symposium on information theory (pp. 1666-1670). IEEE. `link <https://ieeexplore.ieee.org/document/4595271>`__ """ rv_type = SimulatorRV def __new__(cls, name, *args, **kwargs): kwargs.setdefault("class_name", name) return super().__new__(cls, name, *args, **kwargs)
[docs] @classmethod def dist( # type: ignore cls, fn, *unnamed_params, params=None, class_name: str, distance="gaussian", sum_stat="identity", epsilon=1, ndim_supp=0, ndims_params=None, dtype="floatX", **kwargs, ): if not isinstance(distance, Op): if distance == "gaussian": distance = gaussian elif distance == "laplace": distance = laplace elif distance == "kullback_leibler": raise NotImplementedError("KL not refactored yet") # TODO: Wrap KL in pytensor OP # distance = KullbackLeibler(observed) # if sum_stat != "identity": # _log.info(f"Automatically setting sum_stat to identity as expected by {distance}") # sum_stat = "identity" elif callable(distance): distance = create_distance_op_from_fn(distance) else: raise ValueError(f"The distance metric {distance} is not implemented") if not isinstance(sum_stat, Op): if sum_stat == "identity": sum_stat = identity elif sum_stat == "sort": sum_stat = at.sort elif sum_stat == "mean": sum_stat = at.mean elif sum_stat == "median": # Missing in PyTensor, see pytensor/issues/525 sum_stat = create_sum_stat_op_from_fn(np.median) elif callable(sum_stat): sum_stat = create_sum_stat_op_from_fn(sum_stat) else: raise ValueError(f"The summary statistic {sum_stat} is not implemented") epsilon = at.as_tensor_variable(floatX(epsilon)) if params is None: params = unnamed_params else: if unnamed_params: raise ValueError("Cannot pass both unnamed parameters and `params`") # Assume scalar ndims_params if ndims_params is None: ndims_params = [0] * len(params) return super().dist( params, class_name=class_name, fn=fn, ndim_supp=ndim_supp, ndims_params=ndims_params, dtype=dtype, distance=distance, sum_stat=sum_stat, epsilon=epsilon, **kwargs, )
[docs] @classmethod def rv_op( cls, *params, class_name, fn, ndim_supp, ndims_params, dtype, distance, sum_stat, epsilon, **kwargs, ): sim_op = type( f"Simulator_{class_name}", (SimulatorRV,), dict( name=f"Simulator_{class_name}", ndim_supp=ndim_supp, ndims_params=ndims_params, dtype=dtype, inplace=False, fn=fn, _distance=distance, _sum_stat=sum_stat, epsilon=epsilon, ), )() return sim_op(*params, **kwargs)
@_moment.register(SimulatorRV) # type: ignore def simulator_moment(op, rv, *inputs): sim_inputs = inputs[3:] # Take the mean of 10 draws multiple_sim = rv.owner.op(*sim_inputs, size=at.concatenate([[10], rv.shape])) return at.mean(multiple_sim, axis=0) @_logprob.register(SimulatorRV) def simulator_logp(op, values, *inputs, **kwargs): (value,) = values # Use a new rng to avoid non-randomness in parallel sampling # TODO: Model rngs should be updated prior to multiprocessing split, # in which case this would not be needed. However, that would have to be # done for every sampler that may accommodate Simulators rng = pytensor.shared(np.random.default_rng(), name="simulator_rng") # Create a new simulatorRV with identical inputs as the original one sim_value = op.make_node(rng, *inputs[1:]).default_output() sim_value.name = "simulator_value" return op.distance( op.epsilon, op.sum_stat(value), op.sum_stat(sim_value), ) def identity(x): """Identity function, used as a summary statistics.""" return x def gaussian(epsilon, obs_data, sim_data): """Gaussian kernel.""" return -0.5 * ((obs_data - sim_data) / epsilon) ** 2 def laplace(epsilon, obs_data, sim_data): """Laplace kernel.""" return -at.abs((obs_data - sim_data) / epsilon) class KullbackLeibler: """Approximate Kullback-Leibler.""" def __init__(self, obs_data): if obs_data.ndim == 1: obs_data = obs_data[:, None] n, d = obs_data.shape rho_d, _ = cKDTree(obs_data).query(obs_data, 2) self.rho_d = rho_d[:, 1] self.d_n = d / n self.log_r = np.log(n / (n - 1)) self.obs_data = obs_data def __call__(self, epsilon, obs_data, sim_data): if sim_data.ndim == 1: sim_data = sim_data[:, None] nu_d, _ = cKDTree(sim_data).query(self.obs_data, 1) return self.d_n * np.sum(-np.log(nu_d / self.rho_d) / epsilon) + self.log_r def create_sum_stat_op_from_fn(fn): vectorX = at.dvector if pytensor.config.floatX == "float64" else at.fvector # Check if callable returns TensorVariable with dummy inputs try: res = fn(vectorX()) if isinstance(res, TensorVariable): return fn except Exception: pass # Otherwise, automatically wrap in PyTensor Op class SumStat(Op): def make_node(self, x): x = at.as_tensor_variable(x) return Apply(self, [x], [vectorX()]) def perform(self, node, inputs, outputs): (x,) = inputs outputs[0][0] = np.atleast_1d(fn(x)).astype(pytensor.config.floatX) return SumStat() def create_distance_op_from_fn(fn): scalarX = at.dscalar if pytensor.config.floatX == "float64" else at.fscalar vectorX = at.dvector if pytensor.config.floatX == "float64" else at.fvector # Check if callable returns TensorVariable with dummy inputs try: res = fn(scalarX(), vectorX(), vectorX()) if isinstance(res, TensorVariable): return fn except Exception: pass # Otherwise, automatically wrap in PyTensor Op class Distance(Op): def make_node(self, epsilon, obs_data, sim_data): epsilon = at.as_tensor_variable(epsilon) obs_data = at.as_tensor_variable(obs_data) sim_data = at.as_tensor_variable(sim_data) return Apply(self, [epsilon, obs_data, sim_data], [vectorX()]) def perform(self, node, inputs, outputs): eps, obs_data, sim_data = inputs outputs[0][0] = np.atleast_1d(fn(eps, obs_data, sim_data)).astype( pytensor.config.floatX ) return Distance()