# Copyright 2020 The PyMC Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import numpy as np
import pytensor
import pytensor.tensor as at
from pytensor.graph.op import Apply, Op
from pytensor.tensor.random.op import RandomVariable
from pytensor.tensor.var import TensorVariable
from scipy.spatial import cKDTree
from pymc.distributions.distribution import Distribution, _moment
from pymc.logprob.abstract import _logprob
from pymc.pytensorf import floatX
__all__ = ["Simulator"]
_log = logging.getLogger("pymc")
class SimulatorRV(RandomVariable):
"""
Base class for SimulatorRVs
This should be subclassed when defining custom Simulator objects.
"""
name = "SimulatorRV"
ndim_supp = None
ndims_params = None
dtype = "floatX"
_print_name = ("Simulator", "\\operatorname{Simulator}")
fn = None
_distance = None
_sum_stat = None
epsilon = None
@classmethod
def rng_fn(cls, *args, **kwargs):
return cls.fn(*args, **kwargs)
@classmethod
def distance(cls, *args, **kwargs):
return cls._distance(*args, **kwargs)
@classmethod
def sum_stat(cls, *args, **kwargs):
return cls._sum_stat(*args, **kwargs)
[docs]class Simulator(Distribution):
r"""
Simulator distribution, used for Approximate Bayesian Inference (ABC)
with Sequential Monte Carlo (SMC) sampling via :func:`~pymc.sample_smc`.
Simulator distributions have a stochastic pseudo-loglikelihood defined by
a distance metric between the observed and simulated data, and tweaked
by a hyper-parameter ``epsilon``.
Parameters
----------
fn : callable
Python random simulator function. Should expect the following signature
``(rng, arg1, arg2, ... argn, size)``, where rng is a ``numpy.random.Generator``
and ``size`` defines the size of the desired sample.
*unnamed_params : list of TensorVariable
Parameters used by the Simulator random function. Each parameter can be passed
by order after fn, for example ``param1, param2, ..., paramN``. params can also
be passed with keyword argument "params".
params : list of TensorVariable
Keyword form of ''unnamed_params''.
One of unnamed_params or params must be provided.
If passed both unnamed_params and params, an error is raised.
class_name : str
Name for the RandomVariable class which will wrap the Simulator methods.
When not specified, it will be given the name of the variable.
.. warning:: New Simulators created with the same class_name will override the
methods dispatched onto the previous classes. If using Simulators with
different methods across separate models, be sure to use distinct
class_names.
distance : PyTensor_Op, callable or str, default "gaussian"
Distance function. Available options are ``"gaussian"``, ``"laplace"``,
``"kullback_leibler"`` or a user defined function (or PyTensor_Op) that takes
``epsilon``, the summary statistics of observed_data and the summary statistics
of simulated_data as input.
``gaussian``: :math:`-0.5 \left(\left(\frac{xo - xs}{\epsilon}\right)^2\right)`
``laplace``: :math:`-{\left(\frac{|xo - xs|}{\epsilon}\right)}`
``kullback_leibler``: :math:`\frac{d}{n} \frac{1}{\epsilon} \sum_i^n -\log \left( \frac{{\nu_d}_i}{{\rho_d}_i} \right) + \log_r` [1]_
``distance="gaussian"`` + ``sum_stat="sort"`` is equivalent to the 1D 2-wasserstein distance
``distance="laplace"`` + ``sum_stat="sort"`` is equivalent to the the 1D 1-wasserstein distance
sum_stat : PyTensor_Op, callable or str, default "identity"
Summary statistic function. Available options are ``"identity"``,
``"sort"``, ``"mean"``, ``"median"``. If a callable (or PyTensor_Op) is defined,
it should return a 1d numpy array (or PyTensor vector).
epsilon : tensor_like of float, default 1.0
Scaling parameter for the distance functions. It should be a float or
an array of the same size of the output of ``sum_stat``.
ndim_supp : int, default 0
Number of dimensions of the SimulatorRV (0 for scalar, 1 for vector, etc.)
ndims_params : list of int, optional
Number of minimum dimensions of each parameter of the RV. For example,
if the Simulator accepts two scalar inputs, it should be ``[0, 0]``.
Default to list of 0 with length equal to the number of parameters.
Examples
--------
.. code-block:: python
def simulator_fn(rng, loc, scale, size):
return rng.normal(loc, scale, size=size)
with pm.Model() as m:
loc = pm.Normal("loc", 0, 1)
scale = pm.HalfNormal("scale", 1)
simulator = pm.Simulator("simulator", simulator_fn, loc, scale, observed=data)
idata = pm.sample_smc()
References
----------
.. [1] Pérez-Cruz, F. (2008, July). Kullback-Leibler divergence
estimation of continuous distributions. In 2008 IEEE international
symposium on information theory (pp. 1666-1670). IEEE.
`link <https://ieeexplore.ieee.org/document/4595271>`__
"""
rv_type = SimulatorRV
def __new__(cls, name, *args, **kwargs):
kwargs.setdefault("class_name", name)
return super().__new__(cls, name, *args, **kwargs)
[docs] @classmethod
def dist( # type: ignore
cls,
fn,
*unnamed_params,
params=None,
class_name: str,
distance="gaussian",
sum_stat="identity",
epsilon=1,
ndim_supp=0,
ndims_params=None,
dtype="floatX",
**kwargs,
):
if not isinstance(distance, Op):
if distance == "gaussian":
distance = gaussian
elif distance == "laplace":
distance = laplace
elif distance == "kullback_leibler":
raise NotImplementedError("KL not refactored yet")
# TODO: Wrap KL in pytensor OP
# distance = KullbackLeibler(observed)
# if sum_stat != "identity":
# _log.info(f"Automatically setting sum_stat to identity as expected by {distance}")
# sum_stat = "identity"
elif callable(distance):
distance = create_distance_op_from_fn(distance)
else:
raise ValueError(f"The distance metric {distance} is not implemented")
if not isinstance(sum_stat, Op):
if sum_stat == "identity":
sum_stat = identity
elif sum_stat == "sort":
sum_stat = at.sort
elif sum_stat == "mean":
sum_stat = at.mean
elif sum_stat == "median":
# Missing in PyTensor, see pytensor/issues/525
sum_stat = create_sum_stat_op_from_fn(np.median)
elif callable(sum_stat):
sum_stat = create_sum_stat_op_from_fn(sum_stat)
else:
raise ValueError(f"The summary statistic {sum_stat} is not implemented")
epsilon = at.as_tensor_variable(floatX(epsilon))
if params is None:
params = unnamed_params
else:
if unnamed_params:
raise ValueError("Cannot pass both unnamed parameters and `params`")
# Assume scalar ndims_params
if ndims_params is None:
ndims_params = [0] * len(params)
return super().dist(
params,
class_name=class_name,
fn=fn,
ndim_supp=ndim_supp,
ndims_params=ndims_params,
dtype=dtype,
distance=distance,
sum_stat=sum_stat,
epsilon=epsilon,
**kwargs,
)
[docs] @classmethod
def rv_op(
cls,
*params,
class_name,
fn,
ndim_supp,
ndims_params,
dtype,
distance,
sum_stat,
epsilon,
**kwargs,
):
sim_op = type(
f"Simulator_{class_name}",
(SimulatorRV,),
dict(
name=f"Simulator_{class_name}",
ndim_supp=ndim_supp,
ndims_params=ndims_params,
dtype=dtype,
inplace=False,
fn=fn,
_distance=distance,
_sum_stat=sum_stat,
epsilon=epsilon,
),
)()
return sim_op(*params, **kwargs)
@_moment.register(SimulatorRV) # type: ignore
def simulator_moment(op, rv, *inputs):
sim_inputs = inputs[3:]
# Take the mean of 10 draws
multiple_sim = rv.owner.op(*sim_inputs, size=at.concatenate([[10], rv.shape]))
return at.mean(multiple_sim, axis=0)
@_logprob.register(SimulatorRV)
def simulator_logp(op, values, *inputs, **kwargs):
(value,) = values
# Use a new rng to avoid non-randomness in parallel sampling
# TODO: Model rngs should be updated prior to multiprocessing split,
# in which case this would not be needed. However, that would have to be
# done for every sampler that may accommodate Simulators
rng = pytensor.shared(np.random.default_rng(), name="simulator_rng")
# Create a new simulatorRV with identical inputs as the original one
sim_value = op.make_node(rng, *inputs[1:]).default_output()
sim_value.name = "simulator_value"
return op.distance(
op.epsilon,
op.sum_stat(value),
op.sum_stat(sim_value),
)
def identity(x):
"""Identity function, used as a summary statistics."""
return x
def gaussian(epsilon, obs_data, sim_data):
"""Gaussian kernel."""
return -0.5 * ((obs_data - sim_data) / epsilon) ** 2
def laplace(epsilon, obs_data, sim_data):
"""Laplace kernel."""
return -at.abs((obs_data - sim_data) / epsilon)
class KullbackLeibler:
"""Approximate Kullback-Leibler."""
def __init__(self, obs_data):
if obs_data.ndim == 1:
obs_data = obs_data[:, None]
n, d = obs_data.shape
rho_d, _ = cKDTree(obs_data).query(obs_data, 2)
self.rho_d = rho_d[:, 1]
self.d_n = d / n
self.log_r = np.log(n / (n - 1))
self.obs_data = obs_data
def __call__(self, epsilon, obs_data, sim_data):
if sim_data.ndim == 1:
sim_data = sim_data[:, None]
nu_d, _ = cKDTree(sim_data).query(self.obs_data, 1)
return self.d_n * np.sum(-np.log(nu_d / self.rho_d) / epsilon) + self.log_r
def create_sum_stat_op_from_fn(fn):
vectorX = at.dvector if pytensor.config.floatX == "float64" else at.fvector
# Check if callable returns TensorVariable with dummy inputs
try:
res = fn(vectorX())
if isinstance(res, TensorVariable):
return fn
except Exception:
pass
# Otherwise, automatically wrap in PyTensor Op
class SumStat(Op):
def make_node(self, x):
x = at.as_tensor_variable(x)
return Apply(self, [x], [vectorX()])
def perform(self, node, inputs, outputs):
(x,) = inputs
outputs[0][0] = np.atleast_1d(fn(x)).astype(pytensor.config.floatX)
return SumStat()
def create_distance_op_from_fn(fn):
scalarX = at.dscalar if pytensor.config.floatX == "float64" else at.fscalar
vectorX = at.dvector if pytensor.config.floatX == "float64" else at.fvector
# Check if callable returns TensorVariable with dummy inputs
try:
res = fn(scalarX(), vectorX(), vectorX())
if isinstance(res, TensorVariable):
return fn
except Exception:
pass
# Otherwise, automatically wrap in PyTensor Op
class Distance(Op):
def make_node(self, epsilon, obs_data, sim_data):
epsilon = at.as_tensor_variable(epsilon)
obs_data = at.as_tensor_variable(obs_data)
sim_data = at.as_tensor_variable(sim_data)
return Apply(self, [epsilon, obs_data, sim_data], [vectorX()])
def perform(self, node, inputs, outputs):
eps, obs_data, sim_data = inputs
outputs[0][0] = np.atleast_1d(fn(eps, obs_data, sim_data)).astype(
pytensor.config.floatX
)
return Distance()