Source code for pymc.step_methods.slicer

#   Copyright 2024 The PyMC Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

# Modified from original implementation by Dominik Wabersich (2013)


import numpy as np
import numpy.random as nr

from pymc.blocking import RaveledVars, StatsType
from pymc.model import modelcontext
from pymc.pytensorf import compile_pymc, join_nonshared_inputs, make_shared_replacements
from pymc.step_methods.arraystep import ArrayStepShared
from pymc.step_methods.compound import Competence
from pymc.util import get_value_vars_from_user_vars
from pymc.vartypes import continuous_types

__all__ = ["Slice"]

LOOP_ERR_MSG = "max slicer iters %d exceeded"


[docs] class Slice(ArrayStepShared): """ Univariate slice sampler step method. Parameters ---------- vars : list, optional List of value variables for sampler. w : float, default 1.0 Initial width of slice. tune : bool, default True Flag for tuning. model : Model, optional Optional model for sampling step. It will be taken from the context if not provided. iter_limit : int, default np.inf Maximum number of iterations for the slice sampler. """ name = "slice" default_blocked = False stats_dtypes_shapes = { "tune": (bool, []), "nstep_out": (int, []), "nstep_in": (int, []), }
[docs] def __init__(self, vars=None, w=1.0, tune=True, model=None, iter_limit=np.inf, **kwargs): model = modelcontext(model) self.w = np.asarray(w).copy() self.tune = tune self.n_tunes = 0.0 self.iter_limit = iter_limit if vars is None: vars = model.continuous_value_vars else: vars = get_value_vars_from_user_vars(vars, model) point = model.initial_point() shared = make_shared_replacements(point, vars, model) [logp], raveled_inp = join_nonshared_inputs( point=point, outputs=[model.logp()], inputs=vars, shared_inputs=shared ) self.logp = compile_pymc([raveled_inp], logp) self.logp.trust_input = True super().__init__(vars, shared)
[docs] def astep(self, apoint: RaveledVars) -> tuple[RaveledVars, StatsType]: # The arguments are determined by the list passed via `super().__init__(..., fs, ...)` q0_val = apoint.data if q0_val.shape != self.w.shape: self.w = np.resize(self.w, len(q0_val)) # this is a repmat nstep_out = nstep_in = 0 q = np.copy(q0_val) ql = np.copy(q0_val) # l for left boundary qr = np.copy(q0_val) # r for right boundary logp = self.logp for i, wi in enumerate(self.w): # uniformly sample from 0 to p(q), but in log space y = logp(q) - nr.standard_exponential() # Create initial interval ql[i] = q[i] - nr.uniform() * wi # q[i] + r * w qr[i] = ql[i] + wi # Equivalent to q[i] + (1-r) * w # Stepping out procedure cnt = 0 while y <= logp(ql): # changed lt to leq for locally uniform posteriors ql[i] -= wi cnt += 1 if cnt > self.iter_limit: raise RuntimeError(LOOP_ERR_MSG % self.iter_limit) nstep_out += cnt cnt = 0 while y <= logp(qr): qr[i] += wi cnt += 1 if cnt > self.iter_limit: raise RuntimeError(LOOP_ERR_MSG % self.iter_limit) nstep_out += cnt cnt = 0 q[i] = nr.uniform(ql[i], qr[i]) while y > logp(q): # Changed leq to lt, to accommodate for locally flat posteriors # Sample uniformly from slice if q[i] > q0_val[i]: qr[i] = q[i] elif q[i] < q0_val[i]: ql[i] = q[i] q[i] = nr.uniform(ql[i], qr[i]) cnt += 1 if cnt > self.iter_limit: raise RuntimeError(LOOP_ERR_MSG % self.iter_limit) nstep_in += cnt if self.tune: # I was under impression from MacKays lectures that slice width can be tuned without # breaking markovianness. Can we do it regardless of self.tune?(@madanh) self.w[i] = wi * (self.n_tunes / (self.n_tunes + 1)) + (qr[i] - ql[i]) / ( self.n_tunes + 1 ) # Set qr and ql to the accepted points (they matter for subsequent iterations) qr[i] = ql[i] = q[i] if self.tune: self.n_tunes += 1 stats = { "tune": self.tune, "nstep_out": nstep_out, "nstep_in": nstep_in, } return RaveledVars(q, apoint.point_map_info), [stats]
[docs] @staticmethod def competence(var, has_grad): if var.dtype in continuous_types: if not has_grad and var.ndim == 0: return Competence.PREFERRED return Competence.COMPATIBLE return Competence.INCOMPATIBLE