Source code for pymc.distributions.timeseries

#   Copyright 2020 The PyMC Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
import abc
import warnings

from abc import ABCMeta
from typing import Callable, Optional

import aesara
import aesara.tensor as at
import numpy as np

from aeppl.logprob import _logprob
from aesara.graph.basic import Node, clone_replace
from aesara.tensor import TensorVariable
from aesara.tensor.random.op import RandomVariable

from pymc.aesaraf import constant_fold, floatX, intX
from pymc.distributions.continuous import Normal, get_tau_sigma
from pymc.distributions.distribution import (
    Distribution,
    SymbolicRandomVariable,
    _moment,
    moment,
)
from pymc.distributions.logprob import ignore_logprob, logp
from pymc.distributions.multivariate import MvNormal, MvStudentT
from pymc.distributions.shape_utils import (
    _change_dist_size,
    change_dist_size,
    get_support_shape,
    get_support_shape_1d,
)
from pymc.exceptions import NotConstantValueError
from pymc.util import check_dist_not_registered

__all__ = [
    "RandomWalk",
    "GaussianRandomWalk",
    "MvGaussianRandomWalk",
    "MvStudentTRandomWalk",
    "AR",
    "GARCH11",
    "EulerMaruyama",
]


class RandomWalkRV(SymbolicRandomVariable):
    """RandomWalk Variable"""

    default_output = 0
    _print_name = ("RandomWalk", "\\operatorname{RandomWalk}")


class RandomWalk(Distribution):
    r"""RandomWalk Distribution

    TODO: Expand docstrings
    """

    rv_type = RandomWalkRV

    def __new__(cls, *args, innovation_dist, steps=None, **kwargs):
        steps = cls.get_steps(
            innovation_dist=innovation_dist,
            steps=steps,
            shape=None,  # Shape will be checked in `cls.dist`
            dims=kwargs.get("dims"),
            observed=kwargs.get("observed"),
        )

        return super().__new__(cls, *args, innovation_dist=innovation_dist, steps=steps, **kwargs)

    @classmethod
    def dist(cls, init_dist, innovation_dist, steps=None, **kwargs) -> at.TensorVariable:
        if not (
            isinstance(init_dist, at.TensorVariable)
            and init_dist.owner is not None
            and isinstance(init_dist.owner.op, (RandomVariable, SymbolicRandomVariable))
        ):
            raise TypeError("init_dist must be a distribution variable")
        check_dist_not_registered(init_dist)

        if not (
            isinstance(innovation_dist, at.TensorVariable)
            and innovation_dist.owner is not None
            and isinstance(innovation_dist.owner.op, (RandomVariable, SymbolicRandomVariable))
        ):
            raise TypeError("innovation_dist must be a distribution variable")
        check_dist_not_registered(innovation_dist)

        if init_dist.owner.op.ndim_supp != innovation_dist.owner.op.ndim_supp:
            raise TypeError(
                "init_dist and innovation_dist must have the same support dimensionality"
            )

        steps = cls.get_steps(
            innovation_dist=innovation_dist,
            steps=steps,
            shape=kwargs.get("shape"),
            dims=None,
            observed=None,
        )
        if steps is None:
            raise ValueError("Must specify steps or shape parameter")
        steps = at.as_tensor_variable(intX(steps))

        return super().dist([init_dist, innovation_dist, steps], **kwargs)

    @classmethod
    def get_steps(cls, innovation_dist, steps, shape, dims, observed):
        # We need to know the ndim_supp of the innovation_dist
        if not (
            isinstance(innovation_dist, at.TensorVariable)
            and innovation_dist.owner is not None
            and isinstance(innovation_dist.owner.op, (RandomVariable, SymbolicRandomVariable))
        ):
            raise TypeError("innovation_dist must be a distribution variable")

        dist_ndim_supp = innovation_dist.owner.op.ndim_supp
        dist_shape = tuple(innovation_dist.shape)
        support_shape = None
        if steps is not None:
            support_shape = (steps,) + (dist_shape[len(dist_shape) - dist_ndim_supp :])
        support_shape = get_support_shape(
            support_shape=support_shape,
            shape=shape,
            dims=dims,
            observed=observed,
            support_shape_offset=1,
            ndim_supp=dist_ndim_supp + 1,
        )
        if support_shape is not None:
            steps = support_shape[-dist_ndim_supp - 1]
        return steps

    @classmethod
    def rv_op(cls, init_dist, innovation_dist, steps, size=None):
        if not steps.ndim == 0 or not steps.dtype.startswith("int"):
            raise ValueError("steps must be an integer scalar (ndim=0).")

        dist_ndim_supp = init_dist.owner.op.ndim_supp
        init_dist_shape = tuple(init_dist.shape)
        init_dist_batch_shape = init_dist_shape[: len(init_dist_shape) - dist_ndim_supp]
        innovation_dist_shape = tuple(innovation_dist.shape)
        innovation_batch_shape = innovation_dist_shape[
            : len(innovation_dist_shape) - dist_ndim_supp
        ]

        ndim_supp = dist_ndim_supp + 1

        # If not explicit, size is determined by the shapes of the input distributions
        if size is None:
            size = at.broadcast_shape(
                init_dist_batch_shape, innovation_batch_shape, arrays_are_shapes=True
            )

        # Resize input distributions. We will size them to (T, B, S) in order
        # to safely take random draws. We later swap the steps dimension so
        # that the final distribution will follow (B, T, S)
        # init_dist must have shape (1, B, S)
        init_dist = change_dist_size(init_dist, (1, *size))
        # innovation_dist must have shape (T-1, B, S)
        innovation_dist = change_dist_size(innovation_dist, (steps, *size))

        # Create SymbolicRV
        init_dist_, innovation_dist_, steps_ = (
            init_dist.type(),
            innovation_dist.type(),
            steps.type(),
        )
        # Aeppl can only infer the logp of a dimshuffled variables, if the dimshuffle is
        # done directly on top of a RandomVariable. Because of this we dimshuffle the
        # distributions and only then concatenate them, instead of the other way around.
        # shape = (B, 1, S)
        init_dist_dimswapped_ = at.moveaxis(init_dist_, 0, -ndim_supp)
        # shape = (B, T-1, S)
        innovation_dist_dimswapped_ = at.moveaxis(innovation_dist_, 0, -ndim_supp)
        # shape = (B, T, S)
        grw_ = at.concatenate([init_dist_dimswapped_, innovation_dist_dimswapped_], axis=-ndim_supp)
        grw_ = at.cumsum(grw_, axis=-ndim_supp)
        return RandomWalkRV(
            [init_dist_, innovation_dist_, steps_],
            # We pass steps_ through just so we can keep a reference to it, even though
            # it's no longer needed at this point
            [grw_, steps_],
            ndim_supp=ndim_supp,
        )(init_dist, innovation_dist, steps)


@_change_dist_size.register(RandomWalkRV)
def change_random_walk_size(op, dist, new_size, expand):
    init_dist, innovation_dist, steps = dist.owner.inputs
    if expand:
        old_shape = tuple(dist.shape)
        old_size = old_shape[: len(old_shape) - op.ndim_supp]
        new_size = tuple(new_size) + tuple(old_size)
    return RandomWalk.rv_op(init_dist, innovation_dist, steps, size=new_size)


@_moment.register(RandomWalkRV)
def random_walk_moment(op, rv, init_dist, innovation_dist, steps):
    # shape = (1, B, S)
    init_moment = moment(init_dist)
    # shape = (T-1, B, S)
    innovation_moment = moment(innovation_dist)
    # shape = (T, B, S)
    grw_moment = at.concatenate([init_moment, innovation_moment], axis=0)
    grw_moment = at.cumsum(grw_moment, axis=0)
    # shape = (B, T, S)
    grw_moment = at.moveaxis(grw_moment, 0, -op.ndim_supp)
    return grw_moment


@_logprob.register(RandomWalkRV)
def random_walk_logp(op, values, *inputs, **kwargs):
    # Although Aeppl can derive the logprob of random walks, it does not collapse
    # what PyMC considers the core dimension of steps. We do it manually here.
    (value,) = values
    # Recreate RV and obtain inner graph
    rv_node = op.make_node(*inputs)
    rv = clone_replace(
        op.inner_outputs, replace={u: v for u, v in zip(op.inner_inputs, rv_node.inputs)}
    )[op.default_output]
    # Obtain logp via Aeppl of inner graph and collapse steps dimension
    return logp(rv, value).sum(axis=-1)


class PredefinedRandomWalk(ABCMeta):
    """Base class for predefined RandomWalk distributions"""

    def __new__(cls, name, *args, **kwargs):
        init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs)
        return RandomWalk(name, init_dist=init_dist, innovation_dist=innovation_dist, **kwargs)

    @classmethod
    def dist(cls, *args, **kwargs) -> at.TensorVariable:
        init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs)
        return RandomWalk.dist(init_dist=init_dist, innovation_dist=innovation_dist, **kwargs)

    @classmethod
    @abc.abstractmethod
    def get_dists(cls, *args, **kwargs):
        pass


[docs]class GaussianRandomWalk(PredefinedRandomWalk): r"""Random Walk with Normal innovations. Parameters ---------- mu : tensor_like of float, default 0 innovation drift sigma : tensor_like of float, default 1 sigma > 0, innovation standard deviation. init_dist : Distribution Unnamed univariate distribution of the initial value. Unnamed refers to distributions created with the ``.dist()`` API. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Gaussian Random Walk (steps > 0). Only needed if shape is not provided. """
[docs] @classmethod def get_dists(cls, mu=0.0, sigma=1.0, *, init_dist=None, **kwargs): if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error in a future release.", FutureWarning, ) init_dist = kwargs.pop("init") if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `Normal.dist(0, 100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100) mu = at.as_tensor_variable(mu) sigma = at.as_tensor_variable(sigma) innovation_dist = Normal.dist(mu=mu, sigma=sigma) return init_dist, innovation_dist, kwargs
[docs]class MvGaussianRandomWalk(PredefinedRandomWalk): r"""Random Walk with Multivariate Normal innovations Parameters ---------- mu: tensor_like of float innovation drift cov: tensor_like of float pos def matrix, innovation covariance matrix tau: tensor_like of float pos def matrix, inverse covariance matrix chol: tensor_like of float Cholesky decomposition of covariance matrix lower : bool, default=True Whether the cholesky fatcor is given as a lower triangular matrix. init_dist: distribution Unnamed multivariate distribution of the initial value. Unnamed refers to distributions created with the ``.dist()`` API. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Random Walk (steps > 0). Only needed if shape is not provided. Notes ----- Only one of cov, tau or chol is required. """
[docs] @classmethod def get_dists(cls, mu, *, cov=None, tau=None, chol=None, lower=True, init_dist=None, **kwargs): if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error " "in a future release.", FutureWarning, ) init_dist = kwargs.pop("init") if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `MvNormal.dist(0, I*100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = MvNormal.dist(at.zeros_like(mu.shape[-1]), at.eye(mu.shape[-1]) * 100) innovation_dist = MvNormal.dist(mu=mu, cov=cov, tau=tau, chol=chol, lower=lower) return init_dist, innovation_dist, kwargs
[docs]class MvStudentTRandomWalk(PredefinedRandomWalk): r"""Multivariate Random Walk with StudentT innovations Parameters ---------- nu: int degrees of freedom mu: tensor_like of float innovation drift scale: tensor_like of float pos def matrix, innovation covariance matrix tau: tensor_like of float pos def matrix, inverse covariance matrix chol: tensor_like of float Cholesky decomposition of covariance matrix lower : bool, default=True Whether the cholesky fatcor is given as a lower triangular matrix. init_dist: distribution Unnamed multivariate distribution of the initial value. Unnamed refers to distributions created with the ``.dist()`` API. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Random Walk (steps > 0). Only needed if shape is not provided. Notes ----- Only one of cov, tau or chol is required. """
[docs] @classmethod def get_dists( cls, *, nu, mu, scale=None, tau=None, chol=None, lower=True, init_dist=None, **kwargs ): if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error " "in a future release.", FutureWarning, ) init_dist = kwargs.pop("init") if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `MvNormal.dist(0, I*100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = MvNormal.dist(at.zeros_like(mu.shape[-1]), at.eye(mu.shape[-1]) * 100) innovation_dist = MvStudentT.dist( nu=nu, mu=mu, scale=scale, tau=tau, chol=chol, lower=lower, cov=kwargs.pop("cov", None) ) return init_dist, innovation_dist, kwargs
class AutoRegressiveRV(SymbolicRandomVariable): """A placeholder used to specify a log-likelihood for an AR sub-graph.""" default_output = 1 ar_order: int constant_term: bool _print_name = ("AR", "\\operatorname{AR}") def __init__(self, *args, ar_order, constant_term, **kwargs): self.ar_order = ar_order self.constant_term = constant_term super().__init__(*args, **kwargs) def update(self, node: Node): """Return the update mapping for the noise RV.""" # Since noise is a shared variable it shows up as the last node input return {node.inputs[-1]: node.outputs[0]}
[docs]class AR(Distribution): r"""Autoregressive process with p lags. .. math:: x_t = \rho_0 + \rho_1 x_{t-1} + \ldots + \rho_p x_{t-p} + \epsilon_t, \epsilon_t \sim N(0,\sigma^2) The innovation can be parameterized either in terms of precision or standard deviation. The link between the two parametrizations is given by .. math:: \tau = \dfrac{1}{\sigma^2} Parameters ---------- rho : tensor_like of float Tensor of autoregressive coefficients. The n-th entry in the last dimension is the coefficient for the n-th lag. sigma : tensor_like of float, default 1 Standard deviation of innovation (sigma > 0). Only required if tau is not specified. tau : tensor_like of float, optional Precision of innovation (tau > 0). constant : bool, default False Whether the first element of rho should be used as a constant term in the AR process. init_dist : unnamed distribution, optional Scalar or vector distribution for initial values. Unnamed refers to distributions created with the ``.dist()`` API. Distributions should have shape (*shape[:-1], ar_order). If not, it will be automatically resized. Defaults to pm.Normal.dist(0, 100, shape=...). .. warning:: init_dist will be cloned, rendering it independent of the one passed as input. ar_order : int, optional Order of the AR process. Inferred from length of the last dimension of rho, if possible. ar_order = rho.shape[-1] if constant else rho.shape[-1] - 1 steps : int, optional Number of steps in AR process (steps > 0). Only needed if shape is not provided. Notes ----- The init distribution will be cloned, rendering it distinct from the one passed as input. Examples -------- .. code-block:: python # Create an AR of order 3, with a constant term with pm.Model() as AR3: # The first coefficient will be the constant term coefs = pm.Normal("coefs", 0, size=4) # We need one init variable for each lag, hence size=3 init = pm.Normal.dist(5, size=3) ar3 = pm.AR("ar3", coefs, sigma=1.0, init_dist=init, constant=True, steps=500) """ rv_type = AutoRegressiveRV def __new__(cls, name, rho, *args, steps=None, constant=False, ar_order=None, **kwargs): rhos = at.atleast_1d(at.as_tensor_variable(floatX(rho))) ar_order = cls._get_ar_order(rhos=rhos, constant=constant, ar_order=ar_order) steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=ar_order, ) return super().__new__( cls, name, rhos, *args, steps=steps, constant=constant, ar_order=ar_order, **kwargs )
[docs] @classmethod def dist( cls, rho, sigma=None, tau=None, *, init_dist=None, steps=None, constant=False, ar_order=None, **kwargs, ): _, sigma = get_tau_sigma(tau=tau, sigma=sigma) sigma = at.as_tensor_variable(floatX(sigma)) rhos = at.atleast_1d(at.as_tensor_variable(floatX(rho))) if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error in a future release.", FutureWarning, ) init_dist = kwargs.pop("init") ar_order = cls._get_ar_order(rhos=rhos, constant=constant, ar_order=ar_order) steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=ar_order ) if steps is None: raise ValueError("Must specify steps or shape parameter") steps = at.as_tensor_variable(intX(steps), ndim=0) if init_dist is not None: if not isinstance(init_dist, TensorVariable) or not isinstance( init_dist.owner.op, (RandomVariable, SymbolicRandomVariable) ): raise ValueError( f"Init dist must be a distribution created via the `.dist()` API, " f"got {type(init_dist)}" ) check_dist_not_registered(init_dist) if init_dist.owner.op.ndim_supp > 1: raise ValueError( "Init distribution must have a scalar or vector support dimension, ", f"got ndim_supp={init_dist.owner.op.ndim_supp}.", ) else: warnings.warn( "Initial distribution not specified, defaulting to " "`Normal.dist(0, 100, shape=...)`. You can specify an init_dist " "manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100, shape=(*sigma.shape, ar_order)) # Tell Aeppl to ignore init_dist, as it will be accounted for in the logp term init_dist = ignore_logprob(init_dist) return super().dist([rhos, sigma, init_dist, steps, ar_order, constant], **kwargs)
@classmethod def _get_ar_order(cls, rhos: TensorVariable, ar_order: Optional[int], constant: bool) -> int: """Compute ar_order given inputs If ar_order is not specified we do constant folding on the shape of rhos to retrieve it. For example, this will detect that Normal(size=(5, 3)).shape[-1] == 3, which is not known by Aesara before. Raises ------ ValueError If inferred ar_order cannot be inferred from rhos or if it is less than 1 """ if ar_order is None: try: (folded_shape,) = constant_fold((rhos.shape[-1],)) except NotConstantValueError: raise ValueError( "Could not infer ar_order from last dimension of rho. Pass it " "explictily or make sure rho have a static shape" ) ar_order = int(folded_shape) - int(constant) if ar_order < 1: raise ValueError( "Inferred ar_order is smaller than 1. Increase the last dimension " "of rho or remove constant_term" ) return ar_order
[docs] @classmethod def ndim_supp(cls, *args): return 1
[docs] @classmethod def rv_op(cls, rhos, sigma, init_dist, steps, ar_order, constant_term, size=None): # Init dist should have shape (*size, ar_order) if size is not None: batch_size = size else: # In this case the size of the init_dist depends on the parameters shape # The last dimension of rho and init_dist does not matter batch_size = at.broadcast_shape(sigma, rhos[..., 0], at.atleast_1d(init_dist)[..., 0]) if init_dist.owner.op.ndim_supp == 0: init_dist_size = (*batch_size, ar_order) else: # In this case the support dimension must cover for ar_order init_dist_size = batch_size init_dist = change_dist_size(init_dist, init_dist_size) # Create OpFromGraph representing random draws from AR process # Variables with underscore suffix are dummy inputs into the OpFromGraph init_ = init_dist.type() rhos_ = rhos.type() sigma_ = sigma.type() steps_ = steps.type() rhos_bcast_shape_ = init_.shape if constant_term: # In this case init shape is one unit smaller than rhos in the last dimension rhos_bcast_shape_ = (*rhos_bcast_shape_[:-1], rhos_bcast_shape_[-1] + 1) rhos_bcast_ = at.broadcast_to(rhos_, rhos_bcast_shape_) noise_rng = aesara.shared(np.random.default_rng()) def step(*args): *prev_xs, reversed_rhos, sigma, rng = args if constant_term: mu = reversed_rhos[-1] + at.sum(prev_xs * reversed_rhos[:-1], axis=0) else: mu = at.sum(prev_xs * reversed_rhos, axis=0) next_rng, new_x = Normal.dist(mu=mu, sigma=sigma, rng=rng).owner.outputs return new_x, {rng: next_rng} # We transpose inputs as scan iterates over first dimension innov_, innov_updates_ = aesara.scan( fn=step, outputs_info=[{"initial": init_.T, "taps": range(-ar_order, 0)}], non_sequences=[rhos_bcast_.T[::-1], sigma_.T, noise_rng], n_steps=steps_, strict=True, ) (noise_next_rng,) = tuple(innov_updates_.values()) ar_ = at.concatenate([init_, innov_.T], axis=-1) ar_op = AutoRegressiveRV( inputs=[rhos_, sigma_, init_, steps_], outputs=[noise_next_rng, ar_], ar_order=ar_order, constant_term=constant_term, ndim_supp=1, ) ar = ar_op(rhos, sigma, init_dist, steps) return ar
@_change_dist_size.register(AutoRegressiveRV) def change_ar_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) return AR.rv_op( *dist.owner.inputs[:-1], ar_order=op.ar_order, constant_term=op.constant_term, size=new_size, ) @_logprob.register(AutoRegressiveRV) def ar_logp(op, values, rhos, sigma, init_dist, steps, noise_rng, **kwargs): (value,) = values ar_order = op.ar_order constant_term = op.constant_term # Convolve rhos with values if constant_term: expectation = at.add( rhos[..., 0, None], *( rhos[..., i + 1, None] * value[..., ar_order - (i + 1) : -(i + 1)] for i in range(ar_order) ), ) else: expectation = at.add( *( rhos[..., i, None] * value[..., ar_order - (i + 1) : -(i + 1)] for i in range(ar_order) ) ) # Compute and collapse logp across time dimension innov_logp = at.sum( logp(Normal.dist(0, sigma[..., None]), value[..., ar_order:] - expectation), axis=-1 ) init_logp = logp(init_dist, value[..., :ar_order]) if init_dist.owner.op.ndim_supp == 0: init_logp = at.sum(init_logp, axis=-1) return init_logp + innov_logp @_moment.register(AutoRegressiveRV) def ar_moment(op, rv, rhos, sigma, init_dist, steps, noise_rng): # Use last entry of init_dist moment as the moment for the whole AR return at.full_like(rv, moment(init_dist)[..., -1, None]) class GARCH11RV(SymbolicRandomVariable): """A placeholder used to specify a GARCH11 graph.""" default_output = 1 _print_name = ("GARCH11", "\\operatorname{GARCH11}") def update(self, node: Node): """Return the update mapping for the noise RV.""" # Since noise is a shared variable it shows up as the last node input return {node.inputs[-1]: node.outputs[0]}
[docs]class GARCH11(Distribution): r""" GARCH(1,1) with Normal innovations. The model is specified by .. math:: y_t \sim N(0, \sigma_t^2) .. math:: \sigma_t^2 = \omega + \alpha_1 * y_{t-1}^2 + \beta_1 * \sigma_{t-1}^2 where \sigma_t^2 (the error variance) follows a ARMA(1, 1) model. Parameters ---------- omega: tensor omega > 0, mean variance alpha_1: tensor alpha_1 >= 0, autoregressive term coefficient beta_1: tensor beta_1 >= 0, alpha_1 + beta_1 < 1, moving average term coefficient initial_vol: tensor initial_vol >= 0, initial volatility, sigma_0 """ rv_type = GARCH11RV def __new__(cls, *args, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=1, ) return super().__new__(cls, *args, steps=steps, **kwargs)
[docs] @classmethod def dist(cls, omega, alpha_1, beta_1, initial_vol, *, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=1 ) if steps is None: raise ValueError("Must specify steps or shape parameter") steps = at.as_tensor_variable(intX(steps), ndim=0) omega = at.as_tensor_variable(omega) alpha_1 = at.as_tensor_variable(alpha_1) beta_1 = at.as_tensor_variable(beta_1) initial_vol = at.as_tensor_variable(initial_vol) init_dist = Normal.dist(0, initial_vol) # Tell Aeppl to ignore init_dist, as it will be accounted for in the logp term init_dist = ignore_logprob(init_dist) return super().dist([omega, alpha_1, beta_1, initial_vol, init_dist, steps], **kwargs)
[docs] @classmethod def rv_op(cls, omega, alpha_1, beta_1, initial_vol, init_dist, steps, size=None): if size is not None: batch_size = size else: # In this case the size of the init_dist depends on the parameters shape batch_size = at.broadcast_shape(omega, alpha_1, beta_1, initial_vol) init_dist = change_dist_size(init_dist, batch_size) # initial_vol = initial_vol * at.ones(batch_size) # Create OpFromGraph representing random draws from GARCH11 process # Variables with underscore suffix are dummy inputs into the OpFromGraph init_ = init_dist.type() initial_vol_ = initial_vol.type() omega_ = omega.type() alpha_1_ = alpha_1.type() beta_1_ = beta_1.type() steps_ = steps.type() noise_rng = aesara.shared(np.random.default_rng()) def step(prev_y, prev_sigma, omega, alpha_1, beta_1, rng): new_sigma = at.sqrt( omega + alpha_1 * at.square(prev_y) + beta_1 * at.square(prev_sigma) ) next_rng, new_y = Normal.dist(mu=0, sigma=new_sigma, rng=rng).owner.outputs return (new_y, new_sigma), {rng: next_rng} (y_t, _), innov_updates_ = aesara.scan( fn=step, outputs_info=[init_, initial_vol_ * at.ones(batch_size)], non_sequences=[omega_, alpha_1_, beta_1_, noise_rng], n_steps=steps_, strict=True, ) (noise_next_rng,) = tuple(innov_updates_.values()) garch11_ = at.concatenate([init_[None, ...], y_t], axis=0).dimshuffle( tuple(range(1, y_t.ndim)) + (0,) ) garch11_op = GARCH11RV( inputs=[omega_, alpha_1_, beta_1_, initial_vol_, init_, steps_], outputs=[noise_next_rng, garch11_], ndim_supp=1, ) garch11 = garch11_op(omega, alpha_1, beta_1, initial_vol, init_dist, steps) return garch11
@_change_dist_size.register(GARCH11RV) def change_garch11_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) return GARCH11.rv_op( *dist.owner.inputs[:-1], size=new_size, ) @_logprob.register(GARCH11RV) def garch11_logp( op, values, omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng, **kwargs ): (value,) = values # Move the time axis to the first dimension value_dimswapped = value.dimshuffle((value.ndim - 1,) + tuple(range(0, value.ndim - 1))) initial_vol = initial_vol * at.ones_like(value_dimswapped[0]) def volatility_update(x, vol, w, a, b): return at.sqrt(w + a * at.square(x) + b * at.square(vol)) vol, _ = aesara.scan( fn=volatility_update, sequences=[value_dimswapped[:-1]], outputs_info=[initial_vol], non_sequences=[omega, alpha_1, beta_1], strict=True, ) sigma_t = at.concatenate([[initial_vol], vol]) # Compute and collapse logp across time dimension innov_logp = at.sum(logp(Normal.dist(0, sigma_t), value_dimswapped), axis=0) return innov_logp @_moment.register(GARCH11RV) def garch11_moment(op, rv, omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng): # GARCH(1,1) mean is zero return at.zeros_like(rv) class EulerMaruyamaRV(SymbolicRandomVariable): """A placeholder used to specify a log-likelihood for a EulerMaruyama sub-graph.""" default_output = 1 dt: float sde_fn: Callable _print_name = ("EulerMaruyama", "\\operatorname{EulerMaruyama}") def __init__(self, *args, dt, sde_fn, **kwargs): self.dt = dt self.sde_fn = sde_fn super().__init__(*args, **kwargs) def update(self, node: Node): """Return the update mapping for the noise RV.""" # Since noise is a shared variable it shows up as the last node input return {node.inputs[-1]: node.outputs[0]}
[docs]class EulerMaruyama(Distribution): r""" Stochastic differential equation discretized with the Euler-Maruyama method. Parameters ---------- dt : float time step of discretization sde_fn : callable function returning the drift and diffusion coefficients of SDE sde_pars : tuple parameters of the SDE, passed as ``*args`` to ``sde_fn`` init_dist : unnamed distribution, optional Scalar distribution for initial values. Unnamed refers to distributions created with the ``.dist()`` API. Distributions should have shape (*shape[:-1]). If not, it will be automatically resized. Defaults to pm.Normal.dist(0, 100, shape=...). .. warning:: init_dist will be cloned, rendering it independent of the one passed as input. """ rv_type = EulerMaruyamaRV def __new__(cls, name, dt, sde_fn, *args, steps=None, **kwargs): dt = at.as_tensor_variable(floatX(dt)) steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=1, ) return super().__new__(cls, name, dt, sde_fn, *args, steps=steps, **kwargs)
[docs] @classmethod def dist(cls, dt, sde_fn, sde_pars, *, init_dist=None, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=1 ) if steps is None: raise ValueError("Must specify steps or shape parameter") steps = at.as_tensor_variable(intX(steps), ndim=0) dt = at.as_tensor_variable(floatX(dt)) sde_pars = [at.as_tensor_variable(x) for x in sde_pars] if init_dist is not None: if not isinstance(init_dist, TensorVariable) or not isinstance( init_dist.owner.op, (RandomVariable, SymbolicRandomVariable) ): raise ValueError( f"Init dist must be a distribution created via the `.dist()` API, " f"got {type(init_dist)}" ) check_dist_not_registered(init_dist) if init_dist.owner.op.ndim_supp > 0: raise ValueError( "Init distribution must have a scalar support dimension, ", f"got ndim_supp={init_dist.owner.op.ndim_supp}.", ) else: warnings.warn( "Initial distribution not specified, defaulting to " "`Normal.dist(0, 100, shape=...)`. You can specify an init_dist " "manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100, shape=sde_pars[0].shape) # Tell Aeppl to ignore init_dist, as it will be accounted for in the logp term init_dist = ignore_logprob(init_dist) return super().dist([init_dist, steps, sde_pars, dt, sde_fn], **kwargs)
[docs] @classmethod def rv_op(cls, init_dist, steps, sde_pars, dt, sde_fn, size=None): # Init dist should have shape (*size,) if size is not None: batch_size = size else: batch_size = at.broadcast_shape(*sde_pars, init_dist) init_dist = change_dist_size(init_dist, batch_size) # Create OpFromGraph representing random draws from SDE process # Variables with underscore suffix are dummy inputs into the OpFromGraph init_ = init_dist.type() sde_pars_ = [x.type() for x in sde_pars] steps_ = steps.type() noise_rng = aesara.shared(np.random.default_rng()) def step(*prev_args): prev_y, *prev_sde_pars, rng = prev_args f, g = sde_fn(prev_y, *prev_sde_pars) mu = prev_y + dt * f sigma = at.sqrt(dt) * g next_rng, next_y = Normal.dist(mu=mu, sigma=sigma, rng=rng).owner.outputs return next_y, {rng: next_rng} y_t, innov_updates_ = aesara.scan( fn=step, outputs_info=[init_], non_sequences=sde_pars_ + [noise_rng], n_steps=steps_, strict=True, ) (noise_next_rng,) = tuple(innov_updates_.values()) sde_out_ = at.concatenate([init_[None, ...], y_t], axis=0).dimshuffle( tuple(range(1, y_t.ndim)) + (0,) ) eulermaruyama_op = EulerMaruyamaRV( inputs=[init_, steps_] + sde_pars_, outputs=[noise_next_rng, sde_out_], dt=dt, sde_fn=sde_fn, ndim_supp=1, ) eulermaruyama = eulermaruyama_op(init_dist, steps, *sde_pars) return eulermaruyama
@_change_dist_size.register(EulerMaruyamaRV) def change_eulermaruyama_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) init_dist, steps, *sde_pars, _ = dist.owner.inputs return EulerMaruyama.rv_op( init_dist, steps, sde_pars, dt=op.dt, sde_fn=op.sde_fn, size=new_size, ) @_logprob.register(EulerMaruyamaRV) def eulermaruyama_logp(op, values, init_dist, steps, *sde_pars_noise_arg, **kwargs): (x,) = values # noise arg is unused, but is needed to make the logp signature match the rv_op signature *sde_pars, _ = sde_pars_noise_arg # sde_fn is user provided and likely not broadcastable to additional time dimension, # since the input x is now [..., t], we need to broadcast each input to [..., None] # below as best effort attempt to make it work sde_pars_broadcast = [x[..., None] for x in sde_pars] xtm1 = x[..., :-1] xt = x[..., 1:] f, g = op.sde_fn(xtm1, *sde_pars_broadcast) mu = xtm1 + op.dt * f sigma = at.sqrt(op.dt) * g # Compute and collapse logp across time dimension sde_logp = at.sum(logp(Normal.dist(mu, sigma), xt), axis=-1) init_logp = logp(init_dist, x[..., 0]) return init_logp + sde_logp