# Copyright 2020 The PyMC Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import sys
import warnings
from abc import ABC
from typing import Dict, cast
import numpy as np
import pytensor.tensor as at
from pytensor.graph.replace import clone_replace
from scipy.special import logsumexp
from scipy.stats import multivariate_normal
from pymc.backends.ndarray import NDArray
from pymc.blocking import DictToArrayBijection
from pymc.initial_point import make_initial_point_expression
from pymc.model import Point, modelcontext
from pymc.pytensorf import (
compile_pymc,
floatX,
join_nonshared_inputs,
make_shared_replacements,
)
from pymc.sampling.forward import draw
from pymc.step_methods.metropolis import MultivariateNormalProposal
from pymc.vartypes import discrete_types
[docs]class SMC_KERNEL(ABC):
"""Base class for the Sequential Monte Carlo kernels.
To create a new SMC kernel you should subclass from this.
Before sampling, the following methods are called once in order:
initialize_population
Choose initial population of SMC particles. Should return a dictionary
with {var.name : numpy array of size (draws, var.size)}. Defaults
to sampling from the prior distribution. This method is only called
if `start` is not specified.
_initialize_kernel : default
Creates initial population of particles in the variable
`self.tempered_posterior` and populates the `self.var_info` dictionary
with information about model variables shape and size as
{var.name : (var.shape, var.size)}.
The functions `self.prior_logp_func` and `self.likelihood_logp_func` are
created in this step. These expect a 1D numpy array with the summed
sizes of each raveled model variable (in the order specified in
:meth:`pymc.Model.initial_point`).
Finally, this method computes the log prior and log likelihood for
the initial particles, and saves them in `self.prior_logp` and
`self.likelihood_logp`.
This method should not be modified.
setup_kernel : optional
May include any logic that should be performed before sampling
starts.
During each sampling stage the following methods are called in order:
update_beta_and_weights : default
The inverse temperature self.beta is updated based on the `self.likelihood_logp`
and `threshold` parameter.
The importance `self.weights` of each particle are computed from the old and newly
selected inverse temperature.
The iteration number stored in `self.iteration` is updated by this method.
Finally the model `log_marginal_likelihood` of the tempered posterior
is updated from these weights.
resample : default
The particles in `self.posterior` are sampled with replacement based
on `self.weights`, and the used resampling indexes are saved in
`self.resampling_indexes`.
The arrays `self.prior_logp` and `self.likelihood_logp` are rearranged according
to the order of the resampled particles. `self.tempered_posterior_logp`
is computed from these and the current `self.beta`.
tune : optional
May include logic that should be performed before every mutation step.
mutate : REQUIRED
Mutate particles in `self.tempered_posterior`.
This method is further responsible to update the `self.prior_logp`,
`self.likelihod_logp` and `self.tempered_posterior_logp`, corresponding
to each mutated particle.
sample_stats : default
Returns important sampling_stats at the end of each stage in a dictionary
format. This will be saved in the final InferenceData object under `sample_stats`.
Finally, at the end of sampling the following methods are called:
_posterior_to_trace : default
Convert final population of particles to a posterior trace object.
This method should not be modified.
sample_settings : default
Returns important sample_settings at the end of sampling in a dictionary
format. This will be saved in the final InferenceData object under `sample_stats`.
"""
[docs] def __init__(
self,
draws=2000,
start=None,
model=None,
random_seed=None,
threshold=0.5,
):
"""
Initialize the SMC_kernel class.
Parameters
----------
draws : int, default 2000
The number of samples to draw from the posterior (i.e. last stage). Also the number of
independent chains. Defaults to 2000.
start : dict, or array of dict, default None
Starting point in parameter space. It should be a list of dict with length `chains`.
When None (default) the starting point is sampled from the prior distribution.
model : Model (optional if in ``with`` context).
random_seed : int, array_like of int, RandomState or Generator, optional
Value used to initialize the random number generator.
threshold : float, default 0.5
Determines the change of beta from stage to stage, i.e.indirectly the number of stages,
the higher the value of `threshold` the higher the number of stages. Defaults to 0.5.
It should be between 0 and 1.
Attributes
----------
self.var_info : dict
Dictionary that contains information about model variables shape and size.
"""
self.draws = draws
self.start = start
if threshold < 0 or threshold > 1:
raise ValueError(f"Threshold value {threshold} must be between 0 and 1")
self.threshold = threshold
self.model = model
self.rng = np.random.default_rng(seed=random_seed)
self.model = modelcontext(model)
self.variables = self.model.value_vars
self.var_info = {}
self.tempered_posterior = None
self.prior_logp = None
self.likelihood_logp = None
self.tempered_posterior_logp = None
self.prior_logp_func = None
self.likelihood_logp_func = None
self.log_marginal_likelihood = 0
self.beta = 0
self.iteration = 0
self.resampling_indexes = None
self.weights = np.ones(self.draws) / self.draws
[docs] def initialize_population(self) -> Dict[str, np.ndarray]:
"""Create an initial population from the prior distribution"""
sys.stdout.write(" ") # see issue #5828
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=UserWarning, message="The effect of Potentials"
)
model = self.model
prior_expression = make_initial_point_expression(
free_rvs=model.free_RVs,
rvs_to_transforms=model.rvs_to_transforms,
initval_strategies={},
default_strategy="prior",
return_transformed=True,
)
prior_values = draw(prior_expression, draws=self.draws, random_seed=self.rng)
names = [model.rvs_to_values[rv].name for rv in model.free_RVs]
dict_prior = {k: np.stack(v) for k, v in zip(names, prior_values)}
return cast(Dict[str, np.ndarray], dict_prior)
def _initialize_kernel(self):
"""Create variables and logp function necessary to run SMC kernel
This method should not be overwritten. If needed, use `setup_kernel`
instead.
"""
# Create dictionary that stores original variables shape and size
initial_point = self.model.initial_point(random_seed=self.rng.integers(2**30))
for v in self.variables:
self.var_info[v.name] = (initial_point[v.name].shape, initial_point[v.name].size)
# Create particles bijection map
if self.start:
init_rnd = self.start
else:
init_rnd = self.initialize_population()
population = []
for i in range(self.draws):
point = Point({v.name: init_rnd[v.name][i] for v in self.variables}, model=self.model)
population.append(DictToArrayBijection.map(point).data)
self.tempered_posterior = np.array(floatX(population))
# Initialize prior and likelihood log probabilities
shared = make_shared_replacements(initial_point, self.variables, self.model)
self.prior_logp_func = _logp_forw(
initial_point, [self.model.varlogp], self.variables, shared
)
self.likelihood_logp_func = _logp_forw(
initial_point, [self.model.datalogp], self.variables, shared
)
priors = [self.prior_logp_func(sample) for sample in self.tempered_posterior]
likelihoods = [self.likelihood_logp_func(sample) for sample in self.tempered_posterior]
self.prior_logp = np.array(priors).squeeze()
self.likelihood_logp = np.array(likelihoods).squeeze()
[docs] def setup_kernel(self):
"""Setup logic performed once before sampling starts"""
pass
[docs] def update_beta_and_weights(self):
"""Calculate the next inverse temperature (beta)
The importance weights based on two successive tempered likelihoods (i.e.
two successive values of beta) and updates the marginal likelihood estimate.
ESS is calculated for importance sampling. BDA 3rd ed. eq 10.4
"""
self.iteration += 1
low_beta = old_beta = self.beta
up_beta = 2.0
rN = int(len(self.likelihood_logp) * self.threshold)
while up_beta - low_beta > 1e-6:
new_beta = (low_beta + up_beta) / 2.0
log_weights_un = (new_beta - old_beta) * self.likelihood_logp
log_weights = log_weights_un - logsumexp(log_weights_un)
ESS = int(np.exp(-logsumexp(log_weights * 2)))
if ESS == rN:
break
elif ESS < rN:
up_beta = new_beta
else:
low_beta = new_beta
if new_beta >= 1:
new_beta = 1
log_weights_un = (new_beta - old_beta) * self.likelihood_logp
log_weights = log_weights_un - logsumexp(log_weights_un)
self.beta = new_beta
self.weights = np.exp(log_weights)
# We normalize again to correct for small numerical errors that might build up
self.weights /= self.weights.sum()
self.log_marginal_likelihood += logsumexp(log_weights_un) - np.log(self.draws)
[docs] def resample(self):
"""Resample particles based on importance weights"""
self.resampling_indexes = systematic_resampling(self.weights, self.rng)
self.tempered_posterior = self.tempered_posterior[self.resampling_indexes]
self.prior_logp = self.prior_logp[self.resampling_indexes]
self.likelihood_logp = self.likelihood_logp[self.resampling_indexes]
self.tempered_posterior_logp = self.prior_logp + self.likelihood_logp * self.beta
[docs] def tune(self):
"""Tuning logic performed before every mutation step"""
pass
[docs] @abc.abstractmethod
def mutate(self):
"""Apply kernel-specific perturbation to the particles once per stage"""
pass
[docs] def sample_stats(self) -> Dict:
"""Stats to be saved at the end of each stage
These stats will be saved under `sample_stats` in the final InferenceData object.
"""
return {
"log_marginal_likelihood": self.log_marginal_likelihood if self.beta == 1 else np.nan,
"beta": self.beta,
}
[docs] def sample_settings(self) -> Dict:
"""SMC_kernel settings to be saved once at the end of sampling.
These stats will be saved under `sample_stats` in the final InferenceData object.
"""
return {
"_n_draws": self.draws, # Default property name used in `SamplerReport`
"threshold": self.threshold,
}
def _posterior_to_trace(self, chain=0) -> NDArray:
"""Save results into a PyMC trace
This method should not be overwritten.
"""
length_pos = len(self.tempered_posterior)
varnames = [v.name for v in self.variables]
with self.model:
strace = NDArray(name=self.model.name)
strace.setup(length_pos, chain)
for i in range(length_pos):
value = []
size = 0
for var in self.variables:
shape, new_size = self.var_info[var.name]
var_samples = self.tempered_posterior[i][size : size + new_size]
# Round discrete variable samples. The rounded values were the ones
# actually used in the logp evaluations (see logp_forw)
if var.dtype in discrete_types:
var_samples = np.round(var_samples).astype(var.dtype)
value.append(var_samples.reshape(shape))
size += new_size
strace.record(point={k: v for k, v in zip(varnames, value)})
return strace
[docs]class IMH(SMC_KERNEL):
"""Independent Metropolis-Hastings SMC_kernel"""
[docs] def __init__(self, *args, correlation_threshold=0.01, **kwargs):
"""
Parameters
----------
correlation_threshold : float, default 0.01
The lower the value, the higher the number of IMH steps computed automatically.
Defaults to 0.01. It should be between 0 and 1.
**kwargs : dict, optional
Keyword arguments passed to the SMC_kernel. Refer to SMC_kernel documentation for a
list of all possible arguments.
"""
super().__init__(*args, **kwargs)
self.correlation_threshold = correlation_threshold
self.proposal_dist = None
self.acc_rate = None
[docs] def tune(self):
# Update MVNormal proposal based on the mean and covariance of the
# tempered posterior.
cov = np.cov(self.tempered_posterior, ddof=0, rowvar=0)
cov = np.atleast_2d(cov)
cov += 1e-6 * np.eye(cov.shape[0])
if np.isnan(cov).any() or np.isinf(cov).any():
raise ValueError('Sample covariances not valid! Likely "draws" is too small!')
mean = np.average(self.tempered_posterior, axis=0)
self.proposal_dist = multivariate_normal(mean, cov)
[docs] def mutate(self):
"""Independent Metropolis-Hastings perturbation."""
self.n_steps = 1
old_corr = 2
corr = Pearson(self.tempered_posterior)
ac_ = []
while True:
log_R = np.log(self.rng.random(self.draws))
# The proposal is independent from the current point.
# We have to take that into account to compute the Metropolis-Hastings acceptance
# We first compute the logp of proposing a transition to the current points.
# This variable is updated at the end of the loop with the entries from the accepted
# transitions, which is equivalent to recomputing it in every iteration of the loop.
proposal = floatX(self.proposal_dist.rvs(size=self.draws, random_state=self.rng))
proposal = proposal.reshape(len(proposal), -1)
# To do that we compute the logp of moving to a new point
forward_logp = self.proposal_dist.logpdf(proposal)
# And to going back from that new point
backward_logp = self.proposal_dist.logpdf(self.tempered_posterior)
ll = np.array([self.likelihood_logp_func(prop) for prop in proposal])
pl = np.array([self.prior_logp_func(prop) for prop in proposal])
proposal_logp = pl + ll * self.beta
accepted = log_R < (
(proposal_logp + backward_logp) - (self.tempered_posterior_logp + forward_logp)
)
self.tempered_posterior[accepted] = proposal[accepted]
self.tempered_posterior_logp[accepted] = proposal_logp[accepted]
self.prior_logp[accepted] = pl[accepted]
self.likelihood_logp[accepted] = ll[accepted]
ac_.append(accepted)
self.n_steps += 1
pearson_r = corr.get(self.tempered_posterior)
if np.mean((old_corr - pearson_r) > self.correlation_threshold) > 0.9:
old_corr = pearson_r
else:
break
self.acc_rate = np.mean(ac_)
[docs] def sample_stats(self):
stats = super().sample_stats()
stats.update(
{
"accept_rate": self.acc_rate,
}
)
return stats
[docs] def sample_settings(self):
stats = super().sample_settings()
stats.update(
{
"_n_tune": self.n_steps, # Default property name used in `SamplerReport`
"correlation_threshold": self.correlation_threshold,
}
)
return stats
class Pearson:
def __init__(self, a):
self.l = a.shape[0]
self.am = a - np.sum(a, axis=0) / self.l
self.aa = np.sum(self.am**2, axis=0) ** 0.5
def get(self, b):
bm = b - np.sum(b, axis=0) / self.l
bb = np.sum(bm**2, axis=0) ** 0.5
ab = np.sum(self.am * bm, axis=0)
return np.abs(ab / (self.aa * bb))
[docs]class MH(SMC_KERNEL):
"""Metropolis-Hastings SMC_kernel"""
[docs] def __init__(self, *args, correlation_threshold=0.01, **kwargs):
"""
Parameters
----------
correlation_threshold : float, default 0.01
The lower the value, the higher the number of MH steps computed automatically.
Defaults to 0.01. It should be between 0 and 1.
**kwargs : dict, optional
Keyword arguments passed to the SMC_kernel. Refer to SMC_kernel documentation for a
list of all possible arguments.
"""
super().__init__(*args, **kwargs)
self.correlation_threshold = correlation_threshold
self.proposal_dist = None
self.proposal_scales = None
self.chain_acc_rate = None
[docs] def setup_kernel(self):
"""Proposal dist is just a Multivariate Normal with unit identity covariance.
Dimension specific scaling is provided by `self.proposal_scales` and set in `self.tune()`
"""
ndim = self.tempered_posterior.shape[1]
self.proposal_scales = np.full(self.draws, min(1, 2.38**2 / ndim))
[docs] def resample(self):
super().resample()
if self.iteration > 1:
self.proposal_scales = self.proposal_scales[self.resampling_indexes]
self.chain_acc_rate = self.chain_acc_rate[self.resampling_indexes]
[docs] def tune(self):
"""Update proposal scales for each particle dimension and update number of MH steps"""
if self.iteration > 1:
# Rescale based on distance to 0.234 acceptance rate
chain_scales = np.exp(np.log(self.proposal_scales) + (self.chain_acc_rate - 0.234))
# Interpolate between individual and population scales
self.proposal_scales = 0.5 * (chain_scales + chain_scales.mean())
# Update MVNormal proposal based on the covariance of the tempered posterior.
cov = np.cov(self.tempered_posterior, ddof=0, rowvar=0)
cov = np.atleast_2d(cov)
cov += 1e-6 * np.eye(cov.shape[0])
if np.isnan(cov).any() or np.isinf(cov).any():
raise ValueError('Sample covariances not valid! Likely "draws" is too small!')
self.proposal_dist = MultivariateNormalProposal(cov)
[docs] def mutate(self):
"""Metropolis-Hastings perturbation."""
self.n_steps = 1
old_corr = 2
corr = Pearson(self.tempered_posterior)
ac_ = []
while True:
log_R = np.log(self.rng.random(self.draws))
proposal = floatX(
self.tempered_posterior
+ self.proposal_dist(num_draws=self.draws, rng=self.rng)
* self.proposal_scales[:, None]
)
ll = np.array([self.likelihood_logp_func(prop) for prop in proposal])
pl = np.array([self.prior_logp_func(prop) for prop in proposal])
proposal_logp = pl + ll * self.beta
accepted = log_R < (proposal_logp - self.tempered_posterior_logp)
self.tempered_posterior[accepted] = proposal[accepted]
self.prior_logp[accepted] = pl[accepted]
self.likelihood_logp[accepted] = ll[accepted]
self.tempered_posterior_logp[accepted] = proposal_logp[accepted]
ac_.append(accepted)
self.n_steps += 1
pearson_r = corr.get(self.tempered_posterior)
if np.mean((old_corr - pearson_r) > self.correlation_threshold) > 0.9:
old_corr = pearson_r
else:
break
self.chain_acc_rate = np.mean(ac_, axis=0)
[docs] def sample_stats(self):
stats = super().sample_stats()
stats.update(
{
"mean_accept_rate": self.chain_acc_rate.mean(),
"mean_proposal_scale": self.proposal_scales.mean(),
}
)
return stats
[docs] def sample_settings(self):
stats = super().sample_settings()
stats.update(
{
"_n_tune": self.n_steps, # Default property name used in `SamplerReport`
"correlation_threshold": self.correlation_threshold,
}
)
return stats
def systematic_resampling(weights, rng):
"""
Systematic resampling.
Parameters
----------
weights :
The weights should be probabilities and the total sum should be 1.
Returns
-------
new_indices: array
A vector of indices in the interval 0, ..., len(normalized_weights)
"""
lnw = len(weights)
arange = np.arange(lnw)
uniform = (rng.random(1) + arange) / lnw
idx = 0
weight_accu = weights[0]
new_indices = np.empty(lnw, dtype=int)
for i in arange:
while uniform[i] > weight_accu:
idx += 1
weight_accu += weights[idx]
new_indices[i] = idx
return new_indices
def _logp_forw(point, out_vars, in_vars, shared):
"""Compile PyTensor function of the model and the input and output variables.
Parameters
----------
out_vars : list
Containing Distribution for the output variables
in_vars : list
Containing Distribution for the input variables
shared : list
Containing TensorVariable for depended shared data
"""
# Replace integer inputs with rounded float inputs
if any(var.dtype in discrete_types for var in in_vars):
replace_int_input = {}
new_in_vars = []
for in_var in in_vars:
if in_var.dtype in discrete_types:
float_var = at.TensorType("floatX", in_var.type.shape)(in_var.name)
new_in_vars.append(float_var)
replace_int_input[in_var] = at.round(float_var).astype(in_var.dtype)
else:
new_in_vars.append(in_var)
out_vars = clone_replace(out_vars, replace_int_input, rebuild_strict=False)
in_vars = new_in_vars
out_list, inarray0 = join_nonshared_inputs(
point=point, outputs=out_vars, inputs=in_vars, shared_inputs=shared
)
f = compile_pymc([inarray0], out_list[0])
f.trust_input = True
return f