import logging
import warnings
from collections.abc import Callable, Sequence
from typing import Any, Literal
import numpy as np
import pandas as pd
import pymc as pm
import pytensor
import pytensor.tensor as pt
from pymc.model import modelcontext
from pymc.model.transform.optimization import freeze_dims_and_data
from pymc.util import RandomState
from pytensor.graph.basic import Variable
from pytensor.graph.replace import graph_replace
from pytensor.graph.traversal import explicit_graph_inputs
from rich.box import SIMPLE_HEAD
from rich.console import Console
from rich.table import Table
from xarray import DataTree
from pymc_extras.statespace.core.properties import (
Coord,
CoordInfo,
Data,
DataInfo,
Parameter,
ParameterInfo,
Shock,
ShockInfo,
State,
StateInfo,
SymbolicData,
SymbolicDataInfo,
SymbolicVariable,
SymbolicVariableInfo,
)
from pymc_extras.statespace.core.representation import PytensorRepresentation
from pymc_extras.statespace.filters import (
KalmanSmoother,
SquareRootFilter,
StandardFilter,
UnivariateFilter,
)
from pymc_extras.statespace.filters.distributions import (
LinearGaussianStateSpace,
SequenceMvNormal,
)
from pymc_extras.statespace.filters.utilities import stabilize
from pymc_extras.statespace.utils.constants import (
ALL_STATE_AUX_DIM,
ALL_STATE_DIM,
FILTER_OUTPUT_DIMS,
FILTER_OUTPUT_TYPES,
JITTER_DEFAULT,
MATRIX_DIMS,
MATRIX_NAMES,
OBS_STATE_DIM,
SHOCK_DIM,
SHORT_NAME_TO_LONG,
TIME_DIM,
)
from pymc_extras.statespace.utils.data_tools import register_data_with_pymc
_log = logging.getLogger("pymc.experimental.statespace")
floatX = pytensor.config.floatX
FILTER_FACTORY = {
"standard": StandardFilter,
"univariate": UnivariateFilter,
"cholesky": SquareRootFilter,
}
def _validate_filter_arg(filter_arg):
if filter_arg.lower() not in FILTER_OUTPUT_TYPES:
raise ValueError(
f"filter_output should be one of {', '.join(FILTER_OUTPUT_TYPES)}, received {filter_arg}"
)
def _verify_group(group):
if group not in ["prior", "posterior"]:
raise ValueError(f'Argument "group" must be one of "prior" or "posterior", found {group}')
def _validate_property(props, property_name, expected_type):
if isinstance(props, expected_type) or props is None:
return
elif not isinstance(props, tuple | list):
raise TypeError(
f"The {property_name} property must be a {expected_type.__name__} or a "
f"list/tuple of {expected_type.__name__} instances."
)
if not all(isinstance(prop, expected_type) for prop in props):
raise TypeError(
f"All elements of the {property_name} property must be instances of "
f"{expected_type.__name__}."
)
[docs]
class PyMCStateSpace:
r"""
Base class for Linear Gaussian Statespace models in PyMC.
Holds a ``PytensorRepresentation`` and ``KalmanFilter``, and provides a mapping between a PyMC model and the
statespace model.
Parameters
----------
k_endog : int
The number of endogenous variables (observed time series).
k_states : int
The number of state variables.
k_posdef : int
The number of shocks in the model
filter_type : str, optional
The type of Kalman filter to use. Valid options are "standard", "univariate", "single", "cholesky", and
"steady_state". For more information, see the docs for each filter. Default is "standard".
verbose : bool, optional
If True, displays information about the initialized model. Defaults to True.
measurement_error : bool, optional
If true, the model contains measurement error. Needed by post-estimation sampling methods to decide how to
compute the observation errors. If False, these errors are deterministically zero; if True, they are sampled
from a multivariate normal.
mode: str or Mode, optional
Pytensor compile mode, used in auxiliary sampling methods such as ``sample_conditional_posterior`` and
``forecast``. The mode does **not** effect calls to ``pm.sample``.
Regardless of whether a mode is specified, it can always be overwritten via the ``compile_kwargs`` argument
to all sampling methods.
Notes
-----
Based on the statsmodels statespace implementation https://github.com/statsmodels/statsmodels/blob/main/statsmodels/tsa/statespace/representation.py,
described in [1].
All statespace models inherit from this base class, which is responsible for providing an interface between a
PyMC model and a PytensorRepresentation of a linear statespace model. This is done via the ``update`` method,
which takes as input a vector of PyMC random variables and assigns them to their correct positions inside the
underlying ``PytensorRepresentation``. Construction of the parameter vector, called ``theta``, is done
automatically, but depend on the names provided in the ``param_names`` property.
To implement a new statespace model, one needs to:
1. Overload the ``param_names`` property to return a list of parameter names.
2. Overload the ``update`` method to put these parameters into their respective statespace matrices
In addition, a number of additional properties can be overloaded to provide users with additional resources
when writing their PyMC models. For details, see the attributes section of the docs for this class.
Finally, this class holds post-estimation methods common to all statespace models, which do not need to be
overloaded when writing a custom statespace model.
Examples
--------
The local level model is a simple statespace model. It is a Gaussian random walk with a drift term that itself also
follows a Gaussian random walk, as described by the following two equations:
.. math::
\begin{align}
y_{t} &= y_{t-1} + x_t + \nu_t \tag{1} \\
x_{t} &= x_{t-1} + \eta_t \tag{2}
\end{align}
Where :math:`y_t` is the observed data, and :math:`x_t` is an unobserved trend term. The model has two unknown
parameters, the variances on the two innovations, :math:`sigma_\nu` and :math:`sigma_\eta`. Take the hidden state
vector to be :math:`\begin{bmatrix} y_t & x_t \end{bmatrix}^T` and the shock vector
:math:`\varepsilon_t = \begin{bmatrix} \nu_t & \eta_t \end{bmatrix}^T`. Then this model can be cast into state-space
form with the following matrices:
.. math::
\begin{align}
T &= \begin{bmatrix}1 & 1 \\ 0 & 1 \end{bmatrix} &
R &= \begin{bmatrix}1 & 0 \\ 0 & 1 \end{bmatrix} &
Q &= \begin{bmatrix} \sigma_\nu & 0 \\ 0 & \sigma_\eta \end{bmatrix} &
Z &= \begin{bmatrix} 1 & 0 \end{bmatrix}
\end{align}
With the remaining statespace matrices as zero matrices of the appropriate sizes. The model has two states,
two shocks, and one observed state. Knowing all this, a very simple local level model can be implemented as
follows:
.. code:: python
from pymc_extras.statespace.core import PyMCStateSpace
import numpy as np
class LocalLevel(PyMCStateSpace):
def __init__():
# Initialize the superclass. This creates the PytensorRepresentation and the Kalman Filter
super().__init__(k_endog=1, k_states=2, k_posdef=2)
# Declare the non-zero, non-parameterized matrices
self.ssm['transition', :, :] = np.array([[1.0, 1.0], [0.0, 1.0]])
self.ssm['selection', :, :] = np.eye(2)
self.ssm['design', :, :] = np.array([[1.0, 0.0]])
@property
def param_names(self):
return ['x0', 'P0', 'sigma_nu', 'sigma_eta']
def make_symbolic_graph(self):
# Declare symbolic variables that represent parameters of the model
# In this case, we have 4: x0 (initial state), P0 (initial state covariance), sigma_nu, and sigma_eta
x0 = self.make_and_register_variable('x0', shape=(2,))
P0 = self.make_and_register_variable('P0', shape=(2,2))
sigma_mu = self.make_and_register_variable('sigma_nu')
sigma_eta = self.make_and_register_variable('sigma_eta')
# Next, use these symbolic variables to build the statespace matrices by assigning each parameter
# to its correct location in the correct matrix
self.ssm['initial_state', :] = x0
self.ssm['initial_state_cov', :, :] = P0
self.ssm['state_cov', 0, 0] = sigma_nu
self.ssm['state_cov', 1, 1] = sigma_eta
After defining priors over the named parameters ``P0``, ``x0``, ``sigma_eta``, and ``sigma_nu``, we can sample
from this model:
.. code:: python
import pymc as pm
ll = LocalLevel()
with pm.Model() as mod:
x0 = pm.Normal('x0', shape=(2,))
P0_diag = pm.Exponential('P0_diag', 1, shape=(2,))
P0 = pm.Deterministic('P0', pt.diag(P0_diag))
sigma_nu = pm.Exponential('sigma_nu', 1)
sigma_eta = pm.Exponential('sigma_eta', 1)
ll.build_statespace_graph(data = data)
idata = pm.sample()
References
----------
.. [1] Fulton, Chad. "Estimating time series models by state space methods in Python: Statsmodels." (2015).
http://www.chadfulton.com/files/fulton_statsmodels_2017_v1.pdf
"""
[docs]
def __init__(
self,
k_endog: int,
k_states: int,
k_posdef: int,
filter_type: str = "standard",
verbose: bool = True,
measurement_error: bool = False,
mode: str | None = None,
):
self._fit_coords: dict[str, Sequence[str]] | None = None
self._fit_dims: dict[str, Sequence[str]] | None = None
self._fit_data: pt.TensorVariable | None = None
self._fit_exog_data: dict[str, dict] = {}
self._shared_timestep: pt.TensorVariable | None = None
self._needs_exog_data = None
self._tensor_variable_info = SymbolicVariableInfo()
self._tensor_data_info = SymbolicDataInfo()
self.k_endog = k_endog
self.k_states = k_states
self.k_posdef = k_posdef
self.measurement_error = measurement_error
self.mode = mode
self._populate_properties()
# Placeholder for time-varying matrices that depend on data length
self._n_timesteps_placeholder = pt.iscalar("n_timesteps")
# All models contain a state space representation and a Kalman filter
self.ssm = PytensorRepresentation(k_endog, k_states, k_posdef)
# This will be populated with PyMC random matrices after calling _insert_random_variables
self.subbed_ssm: list[pt.TensorVariable] | None = None
if filter_type.lower() not in FILTER_FACTORY.keys():
raise NotImplementedError(
"The following are valid filter types: " + ", ".join(list(FILTER_FACTORY.keys()))
)
if filter_type == "single" and self.k_endog > 1:
raise ValueError('Cannot use filter_type = "single" with multiple observed time series')
self.kalman_filter = FILTER_FACTORY[filter_type.lower()]()
self.kalman_smoother = KalmanSmoother()
self.make_symbolic_graph()
self.requirement_table = None
self._populate_prior_requirements()
self._populate_data_requirements()
if verbose and self.requirement_table:
console = Console()
console.print(self.requirement_table)
def _populate_properties(self) -> None:
self._set_parameters()
self._set_states()
self._set_shocks()
self._set_coords()
self._set_data_info()
def set_parameters(self) -> Parameter | tuple[Parameter, ...] | None:
"""
Provides parameter metadata to the model.
Optional. Default implementation sets an empty ParameterInfo.
Child classes can override to define model parameters.
"""
return
def _set_parameters(self) -> None:
param_list = self.set_parameters()
self._param_info = ParameterInfo(parameters=param_list)
def set_states(self) -> State | tuple[State, ...] | None:
"""
Provide state metadata to the model.
Optional. Default implementation creates generic state names (state_0, state_1, ...).
Child classes can override to provide meaningful state names.
"""
hidden_states = [
State(name=f"hidden_state_{name}", observed=False) for name in range(self.k_states or 0)
]
observed_states = [
State(name=f"observed_state_{name}", observed=True) for name in range(self.k_endog or 0)
]
return *hidden_states, *observed_states
def _set_states(self) -> None:
states = self.set_states()
_validate_property(states, "states", State)
if isinstance(states, State):
states = (states,)
self._state_info = StateInfo(states=states)
def set_shocks(self) -> Shock | tuple[Shock, ...] | None:
"""
Provide shock metadata to the model.
Optional. Default implementation creates generic shock names (shock_0, shock_1, ...).
Child classes can override to provide meaningful shock names.
"""
return tuple(Shock(name=f"shock_{i}") for i in range(self.k_posdef))
def _set_shocks(self) -> None:
shocks = self.set_shocks()
_validate_property(shocks, "shocks", Shock)
if isinstance(shocks, Shock):
shocks = (shocks,)
self._shock_info = ShockInfo(shocks=shocks)
def default_coords(self) -> tuple[Coord, ...]:
return CoordInfo.default_coords_from_model(self).items
def set_coords(self) -> Coord | tuple[Coord, ...] | None:
"""
Provide coordinates to the model.
Optional. Default implementation sets an empty CoordInfo.
Child classes can override to provide model-specific coordinates.
"""
return self.default_coords()
def _set_coords(self) -> None:
coords = self.set_coords()
_validate_property(coords, "coords", Coord)
if isinstance(coords, Coord):
coords = (coords,)
self._coords_info = CoordInfo(coords=coords)
def set_data_info(self) -> Data | tuple[Data, ...] | None:
"""
Provide data_info metadata to the model.
Optional. Default implementation sets an empty DataInfo.
Child classes can override if the model requires exogenous data.
"""
return
def _set_data_info(self) -> None:
data_info = self.set_data_info()
_validate_property(data_info, "data_info", Data)
if isinstance(data_info, Data):
data_info = (data_info,)
self._data_info = DataInfo(data=data_info)
def _populate_prior_requirements(self) -> None:
"""
Add requirements about priors needed for the model to a rich table, including their names,
shapes, named dimensions, and any parameter constraints.
"""
if not self.param_info:
return
if self.requirement_table is None:
self._initialize_requirement_table()
for param, info in self.param_info.items():
self.requirement_table.add_row(
param, str(info["shape"]), info["constraints"], str(info["dims"])
)
def _populate_data_requirements(self) -> None:
"""
Add requirements about the data needed for the model, including their names, shapes, and named dimensions.
"""
if not self.data_info:
return
if self.requirement_table is None:
self._initialize_requirement_table()
else:
self.requirement_table.add_section()
for data, info in self.data_info.items():
self.requirement_table.add_row(data, str(info["shape"]), "pm.Data", str(info["dims"]))
def _initialize_requirement_table(self) -> None:
self.requirement_table = Table(
show_header=True,
show_edge=True,
box=SIMPLE_HEAD,
highlight=True,
)
self.requirement_table.title = "Model Requirements"
self.requirement_table.caption = (
"These parameters should be assigned priors inside a PyMC model block before "
"calling the build_statespace_graph method."
)
self.requirement_table.add_column("Variable", justify="left")
self.requirement_table.add_column("Shape", justify="left")
self.requirement_table.add_column("Constraints", justify="left")
self.requirement_table.add_column("Dimensions", justify="right")
def _unpack_statespace_with_placeholders(
self,
) -> tuple[
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
]:
"""
Helper function to quickly obtain all statespace matrices in the standard order. Matrices returned by this
method will include pytensor placeholders.
"""
a0 = self.ssm["initial_state"]
P0 = self.ssm["initial_state_cov"]
c = self.ssm["state_intercept"]
d = self.ssm["obs_intercept"]
T = self.ssm["transition"]
Z = self.ssm["design"]
R = self.ssm["selection"]
H = self.ssm["obs_cov"]
Q = self.ssm["state_cov"]
return a0, P0, c, d, T, Z, R, H, Q
def unpack_statespace(self) -> list[pt.TensorVariable]:
"""
Helper function to quickly obtain all statespace matrices in the standard order.
"""
if self.subbed_ssm is None:
raise ValueError(
"Cannot unpack the complete statespace system until PyMC model variables have been "
"inserted. To build the random statespace matrices, call build_statespace_graph() inside"
"a PyMC model context. "
)
return self.subbed_ssm
@property
def n_timesteps(self) -> Variable:
"""Symbolic placeholder for the number of time steps in the data."""
return self._n_timesteps_placeholder
@property
def param_names(self) -> tuple[str, ...]:
"""
Names of model parameters
A list of all parameters expected by the model. Each parameter will be sought inside the active PyMC model
context when ``build_statespace_graph`` is invoked.
"""
return self._param_info.names
@property
def data_names(self) -> tuple[str, ...]:
"""
Names of data variables expected by the model.
This does not include the observed data series, which is automatically handled by PyMC. This property only
needs to be implemented for models that expect exogenous data.
"""
return self._data_info.exogenous_names
@property
def param_info(self) -> dict[str, dict[str, Any]]:
"""
Information about parameters needed to declare priors
A dictionary of param_name: dictionary key-value pairs. The return value is used by the
``_print_prior_requirements`` method, to print a message telling users how to define the necessary priors for
the model. Each dictionary should have the following key-value pairs:
* key: "shape", value: a tuple of integers
* key: "constraints", value: a string describing the support of the prior (positive,
positive semi-definite, etc)
* key: "dims", value: tuple of strings
"""
return self._param_info.to_dict()
@property
def data_info(self) -> dict[str, dict[str, Any]]:
"""
Information about Data variables that need to be declared in the PyMC model block.
Returns a dictionary of data_name: dictionary of property-name:property description pairs. The return value is
used by the ``_print_data_requirements`` method, to print a message telling users how to define the necessary
data for the model. Each dictionary should have the following key-value pairs:
* key: "shape", value: a tuple of integers
* key: "dims", value: tuple of strings
"""
return self._data_info.to_dict()
@property
def state_names(self) -> tuple[str, ...]:
"""
A k_states length list of strings, associated with the model's hidden states
"""
return self._state_info.unobserved_state_names
@property
def observed_states(self) -> tuple[str, ...]:
"""
A k_endog length list of strings, associated with the model's observed states
"""
return self._state_info.observed_state_names
@property
def shock_names(self) -> tuple[str, ...]:
"""
A k_posdef length list of strings, associated with the model's shock processes
"""
return self._shock_info.names
@property
def default_priors(self) -> dict[str, Callable]:
"""
Dictionary of parameter names and callable functions to construct default priors for the model
Returns a dictionary with param_name: Callable key-value pairs. Used by the ``add_default_priors()`` method
to automatically add priors to the PyMC model.
"""
raise NotImplementedError("The default_priors property has not been implemented!")
@property
def coords(self) -> dict[str, Sequence[str]]:
"""
PyMC model coordinates
Returns a dictionary of dimension: coordinate key-value pairs, to be provided to ``pm.Model``. Dimensions
should come from the default names defined in ``statespace.utils.constants`` for them to be detected by
sampling methods.
"""
return self._coords_info.to_dict()
@property
def param_dims(self) -> dict[str, tuple[str, ...]]:
"""
Dictionary of named dimensions for each model parameter
Returns a dictionary of param_name: dimension key-value pairs, to be provided to the ``dims`` argument of a
PyMC random variable. Dimensions should come from the default names defined in ``statespace.utils.constants``
for them to be detected by sampling methods.
Note: Scalar parameters (with dims=None) are not included in this dictionary.
"""
return {param.name: param.dims for param in self._param_info if param.dims is not None}
@property
def _name_to_variable(self):
return self._tensor_variable_info.to_dict()
@property
def _name_to_data(self):
return self._tensor_data_info.to_dict()
def add_default_priors(self) -> None:
"""
Add default priors to the active PyMC model context
"""
raise NotImplementedError("The add_default_priors property has not been implemented!")
def make_and_register_variable(
self, name, shape: int | tuple[int, ...] | None = None, dtype=floatX
) -> pt.TensorVariable:
"""
Helper function to create a pytensor symbolic variable and register it in the _name_to_variable dictionary
Parameters
----------
name : str
The name of the placeholder variable. Must be the name of a model parameter.
shape : int or tuple of int
Shape of the parameter
dtype : str, default pytensor.config.floatX
dtype of the parameter
Notes
-----
Symbolic pytensor variables are used in the ``make_symbolic_graph`` method as placeholders for PyMC random
variables. The change is made in the ``_insert_random_variables`` method via ``pytensor.graph_replace``. To
make the change, a dictionary mapping pytensor variables to PyMC random variables needs to be constructed.
The purpose of this method is to:
1. Create the placeholder symbolic variables
2. Register the placeholder variable in the ``_name_to_variable`` dictionary
The shape provided here will define the shape of the prior that will need to be provided by the user.
An error is raised if the provided name has already been registered, or if the name is not present in the
``param_names`` property.
"""
if name not in self.param_names:
raise ValueError(
f"{name} is not a model parameter. All placeholder variables should correspond to model "
f"parameters."
)
if name in self._tensor_variable_info:
raise ValueError(
f"{name} is already a registered placeholder variable with shape "
f"{self._tensor_variable_info[name].type.shape}"
)
placeholder = pt.tensor(name, shape=shape, dtype=dtype)
tensor_var = SymbolicVariable(name=name, symbolic_variable=placeholder)
self._tensor_variable_info = self._tensor_variable_info.add(tensor_var)
return placeholder
def make_and_register_data(
self, name: str, shape: int | tuple[int], dtype: str = floatX
) -> Variable:
r"""
Helper function to create a pytensor symbolic variable and register it in the _name_to_data dictionary
Parameters
----------
name : str
The name of the placeholder data. Must be the name of an expected data variable.
shape : int or tuple of int
Shape of the parameter
dtype : str, default pytensor.config.floatX
dtype of the parameter
Notes
-----
See docstring for make_and_register_variable for more details. This function is similar, but handles data
inputs instead of model parameters.
An error is raised if the provided name has already been registered, or if the name is not present in the
``data_names`` property.
"""
if name not in self.data_names:
raise ValueError(
f"{name} is not a model parameter. All placeholder variables should correspond to model "
f"parameters."
)
if name in self._tensor_data_info:
raise ValueError(
f"{name} is already a registered placeholder variable with shape "
f"{self._tensor_data_info[name].type.shape}"
)
placeholder = pt.tensor(name, shape=shape, dtype=dtype)
tensor_data = SymbolicData(name=name, symbolic_data=placeholder)
self._tensor_data_info = self._tensor_data_info.add(tensor_data)
return placeholder
def make_symbolic_graph(self) -> None:
"""
The purpose of the make_symbolic_graph function is to hide tedious parameter allocations from the user.
In statespace models, it is extremely rare for an entire matrix to be defined by a single prior distribution.
Instead, users expect to place priors over single entries of the matrix. The purpose of this function is to
meet that expectation.
Every statespace model needs to implement this function.
Examples
--------
As an example, consider an ARMA(2,2) model, which has five parameters (excluding the initial state distribution):
2 AR parameters (:math:`\rho_1` and :math:`\rho_2`), 2 MA parameters (:math:`\theta_1` and :math:`theta_2`),
and a single innovation covariance (:math:`\\sigma`). A common way of writing this statespace is:
..math::
\begin{align}
T &= \begin{bmatrix} \rho_1 & 1 & 0 \\
\rho_2 & 0 & 1 \\
0 & 0 & 0
\\end{bmatrix} \\
R & = \begin{bmatrix} 1 \\ \theta_1 \\ \theta_2 \\end{bmatrix} \\
Q &= \begin{bmatrix} \\sigma \\end{bmatrix}
\\end{align}
To implement this model, we begin by creating the required matrices, and fill in the "fixed" values -- the ones
at position (0, 1) and (0, 2) in the T matrix, and at position (0, 0) in the R matrix. These are then saved
to the class's PytensorRepresentation -- called ``ssm``.
.. code:: python
T = np.eye(2, k=1)
R = np.concatenate([np.ones(1,1), np.zeros((2, 1))], axis=0)
self.ssm['transition'] = T
self.ssm['selection'] = R
Next, placeholders need to be inserted for the random variables rho_1, rho_2, theta_1, theta_2, and sigma.
This can be done many ways: we could define two vectors, rho and theta, and a scalar for sigma, or five
scalars. Whatever is chosen, the choice needs to be consistent with the ``param_names`` property.
Suppose the ``param_names`` are ``[rho, theta, sigma]``, then we make one placeholder for each, and insert it
into the correct ``ssm`` matrix, at the correct location. To create placeholders, use the
``make_and_register_variable`` helper method, which will maintain an internal registry of variables.
.. code:: python
rho_parmas = self.make_and_register_variable(name='rho', shape=(2,))
theta_params = self.make_and_register_variable(name='theta', shape=(2,))
sigma = self.make_and_register_variable(name='sigma', shape=(1,))
self.ssm['transition', :, 0] = rho_params
self.ssm['selection', 1:, 0] = theta_params
self.ssm['state_cov', 0, 0] = sigma
"""
raise NotImplementedError("The make_symbolic_statespace method has not been implemented!")
def _get_matrix_shape_and_dims(self, name: str) -> tuple[tuple[int] | None, tuple[str] | None]:
"""
Get the shape and dimensions of a matrix associated with the specified name.
This method retrieves the shape and dimensions of a matrix associated with the given name. Importantly,
it will only find named dimension if they are the "default" dimension names defined in the
statespace.utils.constant.py file.
Parameters
----------
name : str
The name of the matrix whose shape and dimensions need to be retrieved.
Returns
-------
shape: tuple or None
If no named dimension are found, the shape of the requested matrix, otherwise None.
dims: tuple or None
If named dimensions are found, a tuple of strings, otherwise None
"""
pm_mod = modelcontext(None)
dims = MATRIX_DIMS.get(name, None)
dims = dims if all([dim in pm_mod.coords.keys() for dim in dims]) else None
data_len = len(self._fit_data)
if name in self.kalman_filter.seq_names:
shape = (data_len, *self.ssm[SHORT_NAME_TO_LONG[name]].type.shape)
dims = (TIME_DIM, *dims)
else:
shape = self.ssm[SHORT_NAME_TO_LONG[name]].type.shape
shape = shape if dims is None else None
return shape, dims
def _save_exogenous_data_info(self):
"""
Store exogenous data required by posterior sampling functions
"""
pymc_mod = modelcontext(None)
for data_name in self.data_names:
data = pymc_mod[data_name]
self._fit_exog_data[data_name] = {
"name": data_name,
"value": data.get_value(),
"dims": pymc_mod.named_vars_to_dims.get(data_name, None),
}
def _insert_random_variables(self):
"""
Replace pytensor symbolic variables with PyMC random variables.
Examples
--------
.. code:: python
ss_mod = pmss.BayesianSARIMAX(order=(2, 0, 2), verbose=False, stationary_initialization=True)
with pm.Model():
x0 = pm.Normal('x0', size=ss_mod.k_states)
ar_params = pm.Normal('ar_params', size=ss_mod.p)
ma_parama = pm.Normal('ma_params', size=ss_mod.q)
sigma_state = pm.Normal('sigma_state')
ss_mod._insert_random_variables()
matrics = ss_mod.unpack_statespace()
pm.draw(matrices['transition'], random_seed=RANDOM_SEED)
>>> array([[-0.90590386, 1. , 0. ],
>>> [ 1.25190143, 0. , 1. ],
>>> [ 0. , 0. , 0. ]])
pm.draw(matrices['selection'], random_seed=RANDOM_SEED)
>>> array([[ 1. ],
>>> [-2.46741039],
>>> [-0.28947689]])
pm.draw(matrices['state_cov'], random_seed=RANDOM_SEED)
>>> array([[-1.69353533]])
"""
pymc_model = modelcontext(None)
found_params = []
with pymc_model:
for param_name in self.param_names:
param = getattr(pymc_model, param_name, None)
if param is not None:
found_params.append(param.name)
missing_params = list(set(self.param_names) - set(found_params))
if len(missing_params) > 0:
raise ValueError(
"The following required model parameters were not found in the PyMC model: "
+ ", ".join(missing_params)
)
excess_params = list(set(found_params) - set(self.param_names))
if len(excess_params) > 0:
raise ValueError(
"The following parameters were found in the PyMC model but are not required by the statespace model: "
+ ", ".join(excess_params)
)
matrices = list(self._unpack_statespace_with_placeholders())
replacement_dict = {var: pymc_model[name] for name, var in self._name_to_variable.items()}
self.subbed_ssm = graph_replace(matrices, replace=replacement_dict, strict=True)
def _insert_data_variables(self):
"""
Replace symbolic pytensor variables with PyMC data containers.
Only used when models require exogenous data. The observed data is not added to the model using this method!
"""
data_names = self.data_names
if not data_names:
return
pymc_model = modelcontext(None)
found_data = []
with pymc_model:
for data_name in data_names:
data = getattr(pymc_model, data_name, None)
if data is not None:
found_data.append(data.name)
missing_data = list(set(data_names) - set(found_data))
if len(missing_data) > 0:
raise ValueError(
"The following required exogenous data were not found in the PyMC model: "
+ ", ".join(missing_data)
)
replacement_dict = {data: pymc_model[name] for name, data in self._name_to_data.items()}
self.subbed_ssm = graph_replace(self.subbed_ssm, replace=replacement_dict, strict=True)
def _insert_data_shape_into_n_timesteps(self, data):
"""
Replace any occurrence of the n_timesteps symbolic variable with the length of the data.
n_timesteps is a special symbolic variable used by graphs with time-varying matrices, whose shapes won't be
known until the user provides data. We need to collect them and replace them with a single common shared
variable to define all shapes consistently.
"""
# This method should only be called after data has been ingested and transformed into a pytensor variable.
# Otherwise, we don't get symbolic linkage between time-varying matrix shapes and the data when the user calls
# pm.set_data
assert isinstance(data, pt.TensorVariable)
matrices = (
self.subbed_ssm
if self.subbed_ssm is not None
else self._unpack_statespace_with_placeholders()
)
n_timestep_variables = tuple(
variable
for variable in explicit_graph_inputs(matrices)
if variable.name == "n_timesteps"
)
if n_timestep_variables:
self._shared_timestep = data.shape[0].astype("int32")
replacement_dict = {var: self._shared_timestep for var in n_timestep_variables}
self.subbed_ssm = graph_replace(self.subbed_ssm, replace=replacement_dict, strict=False)
def _insert_constant_timestep(self, matrices, step: int | pt.TensorVariable):
"""
Replace any occurrence of the n_timesteps symbolic variable with a constant integer.
This is used for constructing graphs for prior predictive sampling, where no data is available.
"""
step = pt.as_tensor_variable(step).astype("int32")
n_timestep_variables = tuple(
variable
for variable in explicit_graph_inputs(matrices)
if variable.name == "n_timesteps"
)
if not n_timestep_variables:
return matrices
replacement_dict = {var: step for var in n_timestep_variables}
return graph_replace(matrices, replace=replacement_dict, strict=False)
def _register_matrices_with_pymc_model(self) -> list[pt.TensorVariable]:
"""
Add all statespace matrices to the PyMC model currently on the context stack as pm.Deterministic nodes, and
adds named dimensions if they are found.
Returns
-------
registered_matrices: list of pt.TensorVariable
list of statespace matrices, wrapped in pm.Deterministic
"""
pm_mod = modelcontext(None)
matrices = self.unpack_statespace()
time_varying_names = self.ssm.time_varying_names
registered_matrices = []
for i, (matrix, name) in enumerate(zip(matrices, MATRIX_NAMES)):
if not getattr(pm_mod, name, None):
shape, dims = self._get_matrix_shape_and_dims(name)
has_dims = dims is not None
if SHORT_NAME_TO_LONG[name] in time_varying_names and has_dims:
dims = (TIME_DIM, *dims)
x = pm.Deterministic(name, matrix, dims=dims)
registered_matrices.append(x)
else:
registered_matrices.append(matrices[i])
return registered_matrices
@staticmethod
def _register_kalman_filter_outputs_with_pymc_model(outputs: tuple[pt.TensorVariable]) -> None:
mod = modelcontext(None)
coords = mod.coords
states, covs = outputs[:4], outputs[4:]
state_names = [
"filtered_states",
"predicted_states",
"predicted_observed_states",
"smoothed_states",
]
cov_names = [
"filtered_covariances",
"predicted_covariances",
"predicted_observed_covariances",
"smoothed_covariances",
]
with mod:
for var, name in zip(states + covs, state_names + cov_names):
dim_names = FILTER_OUTPUT_DIMS.get(name, None)
dims = tuple([dim if dim in coords.keys() else None for dim in dim_names])
pm.Deterministic(name, var, dims=dims)
def build_statespace_graph(
self,
data: np.ndarray | pd.DataFrame | pt.TensorVariable,
register_data: bool = True,
missing_fill_value: float | None = None,
cov_jitter: float | None = JITTER_DEFAULT,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
save_kalman_filter_outputs_in_idata: bool = False,
mode: str | None = None,
) -> None:
"""
Given a parameter vector `theta`, constructs the full computational graph describing the state space model and
the associated log probability of the data. Hidden states and log probabilities are computed via the Kalman
Filter.
Parameters
----------
data : Union[np.ndarray, pd.DataFrame, pt.TensorVariable]
The observed data used to fit the state space model. It can be a NumPy array, a Pandas DataFrame,
or a Pytensor tensor variable.
register_data : bool, optional, default=True
If True, the observed data will be registered with PyMC as a pm.Data variable. In addition,
a "time" dim will be created an added to the model's coords.
mode : Optional[str], optional, default=None
The Pytensor mode used for the computation graph construction. If None, the default mode will be used.
Other options include "JAX" and "NUMBA".
missing_fill_value: float, optional, default=-9999
A value to mask in missing values. NaN values in the data need to be filled with an arbitrary value to
avoid triggering PyMC's automatic imputation machinery (missing values are instead filled by treating them
as hidden states during Kalman filtering).
In general this never needs to be set. But if by a wild coincidence your data includes the value -9999.0,
you will need to change the missing_fill_value to something else, to avoid incorrectly mark in
data as missing.
cov_jitter: float, default 1e-8 or 1e-6 if pytensor.config.floatX is float32
The Kalman filter is known to be numerically unstable, especially at half precision. This value is added to
the diagonal of every covariance matrix -- predicted, filtered, and smoothed -- at every step, to ensure
all matrices are strictly positive semi-definite.
Obviously, if this can be zero, that's best. In general:
- Having measurement error makes Kalman Filters more robust. A large source of numerical errors come
from the Filtered and Smoothed covariance matrices having a zero in the (0, 0) position, which always
occurs when there is no measurement error. You can lower this value in the presence of measurement
error.
- The Univariate Filter is more robust than other filters, and can tolerate a lower jitter value
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
save_kalman_filter_outputs_in_idata: bool, optional, default=False
If True, Kalman Filter outputs will be saved in the model as deterministics. Useful for debugging, but
should not be necessary for the majority of users.
mode: str, optional
Pytensor mode to use when compiling the graph. This will be saved as a model attribute and used when
compiling sampling functions (e.g. ``sample_conditional_prior``).
.. deprecated:: 0.2.5
The `mode` argument is deprecated and will be removed in a future version. Pass ``mode`` to the
model constructor, or manually specify ``compile_kwargs`` in sampling functions instead.
"""
if mode is not None:
warnings.warn(
"The `mode` argument is deprecated and will be removed in a future version. "
"Pass `mode` to the model constructor, or manually specify `compile_kwargs` in sampling functions"
" instead.",
DeprecationWarning,
)
self.mode = mode
pm_mod = modelcontext(None)
self._insert_random_variables()
self._save_exogenous_data_info()
self._insert_data_variables()
obs_coords = pm_mod.coords.get(OBS_STATE_DIM, None)
self._fit_data = data
data, nan_mask = register_data_with_pymc(
data,
n_obs=self.ssm.k_endog,
obs_coords=obs_coords,
register_data=register_data,
missing_fill_value=missing_fill_value,
)
# Order is important here: only call _insert_data_shape_into_n_timesteps after data has been registered.
self._insert_data_shape_into_n_timesteps(data)
filter_outputs = self.kalman_filter.build_graph(
pt.as_tensor_variable(data),
*self.unpack_statespace(),
missing_fill_value=missing_fill_value,
cov_jitter=cov_jitter,
time_varying_names=self.ssm.time_varying_names,
)
logp = filter_outputs.pop(-1)
states, covs = filter_outputs[:3], filter_outputs[3:]
filtered_states, predicted_states, observed_states = states
filtered_covariances, predicted_covariances, observed_covariances = covs
if save_kalman_filter_outputs_in_idata:
smooth_states, smooth_covariances = self._build_smoother_graph(
filtered_states, filtered_covariances, self.unpack_statespace()
)
all_kf_outputs = [*states, smooth_states, *covs, smooth_covariances]
self._register_kalman_filter_outputs_with_pymc_model(all_kf_outputs)
obs_dims = FILTER_OUTPUT_DIMS["predicted_observed_states"]
obs_dims = obs_dims if all([dim in pm_mod.coords.keys() for dim in obs_dims]) else None
SequenceMvNormal(
"obs",
mus=observed_states,
covs=observed_covariances,
logp=logp,
observed=data,
dims=obs_dims,
method=mvn_method,
)
self._fit_coords = pm_mod.coords.copy()
self._fit_dims = pm_mod.named_vars_to_dims.copy()
def _build_smoother_graph(
self,
filtered_states: pt.TensorVariable,
filtered_covariances: pt.TensorVariable,
matrices,
mode: str | None = None,
cov_jitter=JITTER_DEFAULT,
):
"""
Build the computation graph for the Kalman smoother.
This method constructs the computation graph for applying the Kalman smoother to the filtered states
and covariances obtained from the Kalman filter. The Kalman smoother is used to generate smoothed
estimates of the latent states and their covariances in a state space model.
The Kalman smoother provides a more accurate estimate of the latent states by incorporating future
information in the backward pass, resulting in smoothed state trajectories.
Parameters
----------
filtered_states : pytensor.tensor.TensorVariable
The filtered states obtained from the Kalman filter. Returned by the `build_statespace_graph` method.
filtered_covariances : pytensor.tensor.TensorVariable
The filtered state covariances obtained from the Kalman filter. Returned by the `build_statespace_graph`
method.
mode : Optional[str], default=None
The mode used by pytensor for the construction of the logp graph. If None, the mode provided to
`build_statespace_graph` will be used.
Returns
-------
Tuple[pytensor.tensor.TensorVariable, pytensor.tensor.TensorVariable]
A tuple containing TensorVariables representing the smoothed states and smoothed state covariances
obtained from the Kalman smoother.
"""
pymc_model = modelcontext(None)
with pymc_model:
*_, T, Z, R, H, Q = matrices
smooth_states, smooth_covariances = self.kalman_smoother.build_graph(
T,
R,
Q,
filtered_states,
filtered_covariances,
cov_jitter=cov_jitter,
time_varying_names=self.ssm.time_varying_names,
)
smooth_states.name = "smooth_states"
smooth_covariances.name = "smooth_covariances"
return smooth_states, smooth_covariances
def _build_dummy_graph(self) -> None:
"""
Build a dummy computation graph for the state space model matrices.
This method creates "dummy" pm.Flat variables representing the deep parameters used in the state space model.
Returns
-------
list[pm.Flat]
A list of pm.Flat variables representing all parameters estimated by the model.
"""
def infer_variable_shape(name):
shape = self._name_to_variable[name].type.shape
if not any(dim is None for dim in shape):
return shape
dim_names = self._fit_dims.get(name, None)
if dim_names is None:
raise ValueError(
f"Could not infer shape for {name}, because it was not given coords during model"
f"fitting"
)
shape_from_coords = tuple([len(self._fit_coords[dim]) for dim in dim_names])
return tuple(
[
shape[i] if shape[i] is not None else shape_from_coords[i]
for i in range(len(shape))
]
)
for name in self.param_names:
pm.Flat(
name,
shape=infer_variable_shape(name),
dims=self._fit_dims.get(name, None),
)
def _kalman_filter_outputs_from_dummy_graph(
self,
data: pt.TensorLike | None = None,
data_dims: str | tuple[str] | list[str] | None = None,
scenario: dict[str, pd.DataFrame] | pd.DataFrame | None = None,
) -> tuple[list[pt.TensorVariable], list[tuple[pt.TensorVariable, pt.TensorVariable]]]:
"""
Builds a Kalman filter graph using "dummy" pm.Flat distributions for the model variables and sorts the returns
into (mean, covariance) pairs for each of filtered, predicted, and smoothed output.
Parameters
----------
data: pt.TensorLike, optional
Observed data on which to condition the model. If not provided, the function will use the data that was
provided when the model was built.
data_dims: str or tuple of str, optional
Dimension names associated with the model data. If None, defaults to ("time", "obs_state")
scenario: dict[str, pd.DataFrame], optional
Dictionary of out-of-sample scenario dataframes. If provided, it must have values for all data variables
in the model. pm.set_data is used to replace training data with new values.
Returns
-------
matrices: list of tensors
Statespace matrices with dummy parameters.
grouped_outputs: list of tuple of tensors
A list of tuples, each containing the mean and covariance of the filtered, predicted, and smoothed states.
"""
if scenario is None:
scenario = dict()
pm_mod = modelcontext(None)
self._build_dummy_graph()
self._insert_random_variables()
for name in self.data_names:
if name not in pm_mod:
pm.Data(**self._fit_exog_data[name])
self._insert_data_variables()
for name in self.data_names:
if name in scenario.keys():
pm.set_data({name: scenario[name]})
matrices = self.unpack_statespace()
if data is None:
data = self._fit_data
# Replace n_timesteps with data length for time-varying matrices
data_len = data.shape[0] if hasattr(data, "shape") else len(data)
matrices = self._insert_constant_timestep(matrices, data_len)
x0, P0, c, d, T, Z, R, H, Q = matrices
obs_coords = pm_mod.coords.get(OBS_STATE_DIM, None)
data, nan_mask = register_data_with_pymc(
data,
n_obs=self.ssm.k_endog,
obs_coords=obs_coords,
data_dims=data_dims,
register_data=True,
)
filter_outputs = self.kalman_filter.build_graph(
data,
x0,
P0,
c,
d,
T,
Z,
R,
H,
Q,
time_varying_names=self.ssm.time_varying_names,
)
filter_outputs.pop(-1)
states, covariances = filter_outputs[:3], filter_outputs[3:]
filtered_states, predicted_states, _ = states
filtered_covariances, predicted_covariances, _ = covariances
[smoothed_states, smoothed_covariances] = self.kalman_smoother.build_graph(
T,
R,
Q,
filtered_states,
filtered_covariances,
time_varying_names=self.ssm.time_varying_names,
)
grouped_outputs = [
(filtered_states, filtered_covariances),
(predicted_states, predicted_covariances),
(smoothed_states, smoothed_covariances),
]
return [x0, P0, c, d, T, Z, R, H, Q], grouped_outputs
def _sample_conditional(
self,
idata: DataTree,
group: str,
random_seed: RandomState | None = None,
data: pt.TensorLike | None = None,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
**kwargs,
):
"""
Common functionality shared between `sample_conditional_prior` and `sample_conditional_posterior`. See those
methods for details.
Parameters
----------
idata : DataTree
A DataTree object containing the posterior distribution over model parameters.
group : str
DataTree group from which to draw samples. Should be one of "prior" or "posterior".
random_seed : int, RandomState or Generator, optional
Seed for the random number generator.
data: pt.TensorLike, optional
Observed data on which to condition the model. If not provided, the function will use the data that was
provided when the model was built.
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
kwargs:
Additional keyword arguments are passed to pymc.sample_posterior_predictive
Returns
-------
DataTree
A DataTree object containing sampled trajectories from the requested conditional distribution,
with data variables "filtered_{group}", "predicted_{group}", and "smoothed_{group}".
"""
if data is None and self._fit_data is None:
raise ValueError("No data provided to condition the model")
_verify_group(group)
group_idata = getattr(idata, group)
compile_kwargs = kwargs.pop("compile_kwargs", {})
compile_kwargs.setdefault("mode", self.mode)
with pm.Model(coords=self._fit_coords) as forward_model:
(
[
x0,
P0,
c,
d,
T,
Z,
R,
H,
Q,
],
grouped_outputs,
) = self._kalman_filter_outputs_from_dummy_graph(data=data)
for name, (mu, cov) in zip(FILTER_OUTPUT_TYPES, grouped_outputs):
dummy_ll = pt.zeros_like(mu)
state_dims = (
(TIME_DIM, ALL_STATE_DIM)
if all([dim in self._fit_coords for dim in [TIME_DIM, ALL_STATE_DIM]])
else (None, None)
)
obs_dims = (
(TIME_DIM, OBS_STATE_DIM)
if all([dim in self._fit_coords for dim in [TIME_DIM, OBS_STATE_DIM]])
else (None, None)
)
SequenceMvNormal(
f"{name}_{group}",
mus=mu,
covs=cov,
logp=dummy_ll,
dims=state_dims,
method=mvn_method,
)
obs_mu = d + (Z @ mu[..., None]).squeeze(-1)
obs_cov = Z @ cov @ pt.swapaxes(Z, -2, -1) + H
SequenceMvNormal(
f"{name}_{group}_observed",
mus=obs_mu,
covs=obs_cov,
logp=dummy_ll,
dims=obs_dims,
method=mvn_method,
)
# TODO: Remove this after pm.Flat initial values are fixed
forward_model.rvs_to_initial_values = {
rv: None for rv in forward_model.rvs_to_initial_values.keys()
}
frozen_model = freeze_dims_and_data(forward_model)
with frozen_model:
idata_conditional = pm.sample_posterior_predictive(
group_idata,
var_names=[
f"{name}_{group}{suffix}"
for name in FILTER_OUTPUT_TYPES
for suffix in ["", "_observed"]
],
random_seed=random_seed,
compile_kwargs=compile_kwargs,
**kwargs,
)
return idata_conditional.posterior_predictive
def _sample_unconditional(
self,
idata: DataTree,
group: str,
steps: int | None = None,
use_data_time_dim: bool = False,
random_seed: RandomState | None = None,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
**kwargs,
):
"""
Draw unconditional sample trajectories according to state space dynamics, using random samples from the
a provided trace. The state space update equations are:
X[t+1] = T @ X[t] + R @ eta[t], eta ~ N(0, Q)
Y[t] = Z @ X[t] + nu[t], nu ~ N(0, H)
x[0] ~ N(a0, P0)
Parameters
----------
idata : DataTree
A DataTree object with a posterior group containing samples from the
posterior distribution over model parameters.
steps : Optional[int], default=None
The number of time steps to sample for the unconditional trajectories. If not provided (None),
the function will sample trajectories for the entire available time dimension in the posterior.
Otherwise, it will generate trajectories for the specified number of steps.
use_data_time_dim : bool, default=False
If True, the function uses the time dimension present in the provided `idata` object to sample
unconditional trajectories. If False, a custom time dimension is created based on the number of steps
specified, or if steps is None, it uses the entire available time dimension in the posterior.
random_seed : int, RandomState or Generator, optional
Seed for the random number generator.
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
kwargs:
Additional keyword arguments are passed to pymc.sample_posterior_predictive
Returns
-------
DataTree
An Arviz InfereceData with two groups, posterior_latent and posterior_observed
- posterior_latent represents the latent state trajectories `X[t]`, which follows the dynamics:
`x[t+1] = T @ x[t] + R @ eta[t]`, where `eta ~ N(0, Q)`.
- posterior_observed represents the observed state trajectories `Y[t]`, which is obtained from
the latent state trajectories: `y[t] = Z @ x[t] + nu[t]`, where `nu ~ N(0, H)`.
"""
_verify_group(group)
compile_kwargs = kwargs.pop("compile_kwargs", {})
compile_kwargs.setdefault("mode", self.mode)
group_idata = getattr(idata, group)
dims = None
temp_coords = self._fit_coords.copy()
if not use_data_time_dim and steps is not None:
temp_coords.update({TIME_DIM: np.arange(1 + steps, dtype="int")})
steps = len(temp_coords[TIME_DIM]) - 1
elif steps is not None:
n_dimsteps = len(temp_coords[TIME_DIM])
if n_dimsteps != steps:
raise ValueError(
f"Length of time dimension does not match specified number of steps, expected"
f" {n_dimsteps} steps, or steps=None."
)
else:
steps = len(temp_coords[TIME_DIM]) - 1
if all([dim in self._fit_coords for dim in [TIME_DIM, ALL_STATE_DIM, OBS_STATE_DIM]]):
dims = [TIME_DIM, ALL_STATE_DIM, OBS_STATE_DIM]
with pm.Model(coords=temp_coords if dims is not None else None) as forward_model:
self._build_dummy_graph()
self._insert_random_variables()
for name in self.data_names:
pm.Data(**self._fit_exog_data[name])
self._insert_data_variables()
matrices = self._insert_constant_timestep(self.unpack_statespace(), step=steps)
x0, P0, c, d, T, Z, R, H, Q = matrices
if not self.measurement_error:
H_jittered = pm.Deterministic("H_jittered", stabilize(H))
matrices = [x0, P0, c, d, T, Z, R, H_jittered, Q]
LinearGaussianStateSpace(
group,
*matrices,
steps=steps,
dims=dims,
method=mvn_method,
sequence_names=self.kalman_filter.seq_names,
k_endog=self.k_endog,
)
# TODO: Remove this after pm.Flat has its initial_value fixed
forward_model.rvs_to_initial_values = {
rv: None for rv in forward_model.rvs_to_initial_values.keys()
}
frozen_model = freeze_dims_and_data(forward_model)
with frozen_model:
idata_unconditional = pm.sample_posterior_predictive(
group_idata,
var_names=[f"{group}_latent", f"{group}_observed"],
random_seed=random_seed,
compile_kwargs=compile_kwargs,
**kwargs,
)
return idata_unconditional.posterior_predictive
def sample_conditional_prior(
self,
idata: DataTree,
random_seed: RandomState | None = None,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
**kwargs,
) -> DataTree:
"""
Sample from the conditional prior; that is, given parameter draws from the prior distribution,
compute Kalman filtered trajectories. Trajectories are drawn from a single multivariate normal with mean and
covariance computed via either the Kalman filter, smoother, or predictions.
Parameters
----------
idata : DataTree
DataTree with prior samples for state space matrices x0, P0, c, d, T, Z, R, H, Q.
Obtained from `pm.sample_prior_predictive` after calling PyMCStateSpace.build_statespace_graph().
random_seed : int, RandomState or Generator, optional
Seed for the random number generator.
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
kwargs:
Additional keyword arguments are passed to pymc.sample_posterior_predictive
Returns
-------
DataTree
A DataTree object containing sampled trajectories from the conditional prior.
The trajectories are stored in the posterior_predictive group with names "filtered_prior",
"predicted_prior", and "smoothed_prior".
"""
return self._sample_conditional(
idata=idata,
group="prior",
random_seed=random_seed,
mvn_method=mvn_method,
**kwargs,
)
def sample_conditional_posterior(
self,
idata: DataTree,
random_seed: RandomState | None = None,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
**kwargs,
):
"""
Sample from the conditional posterior; that is, given parameter draws from the posterior distribution,
compute Kalman filtered trajectories. Trajectories are drawn from a single multivariate normal with mean and
covariance computed via either the Kalman filter, smoother, or predictions.
Parameters
----------
idata : DataTree
A DataTree object containing the posterior distribution over model parameters.
random_seed : int, RandomState or Generator, optional
Seed for the random number generator.
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
kwargs:
Additional keyword arguments are passed to pymc.sample_posterior_predictive
Returns
-------
DataTree
A DataTree object containing sampled trajectories from the conditional posterior.
The trajectories are stored in the posterior_predictive group with names "filtered_posterior",
"predicted_posterior", and "smoothed_posterior".
"""
return self._sample_conditional(
idata=idata,
group="posterior",
random_seed=random_seed,
mvn_method=mvn_method,
**kwargs,
)
def sample_unconditional_prior(
self,
idata: DataTree,
steps: int | None = None,
use_data_time_dim: bool = False,
random_seed: RandomState | None = None,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
**kwargs,
) -> DataTree:
"""
Draw unconditional sample trajectories according to state space dynamics, using random samples from the prior
distribution over model parameters. The state space update equations are:
X[t+1] = T @ X[t] + R @ eta[t], eta ~ N(0, Q)
Y[t] = Z @ X[t] + nu[t], nu ~ N(0, H)
Parameters
----------
idata: DataTree
DataTree with prior samples for state space matrices x0, P0, c, d, T, Z, R, H, Q.
Obtained from `pm.sample_prior_predictive` after calling PyMCStateSpace.build_statespace_graph().
steps : Optional[int], default=None
The number of time steps to sample for the unconditional trajectories. If not provided (None),
the function will sample trajectories for the entire available time dimension in the posterior.
Otherwise, it will generate trajectories for the specified number of steps.
use_data_time_dim : bool, default=False
If True, the function uses the time dimension present in the provided `idata` object to sample
unconditional trajectories. If False, a custom time dimension is created based on the number of steps
specified, or if steps is None, it uses the entire available time dimension in the posterior.
random_seed : int, RandomState or Generator, optional
Seed for the random number generator.
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
kwargs:
Additional keyword arguments are passed to pymc.sample_posterior_predictive
Returns
-------
DataTree
An Arviz InfereceData with two data variables, prior_latent and prior_observed
- prior_latent represents the latent state trajectories `X[t]`, which follows the dynamics:
`x[t+1] = T @ x[t] + R @ eta[t]`, where `eta ~ N(0, Q)`.
- prior_observed represents the observed state trajectories `Y[t]`, which is obtained from
the observation equation: `y[t] = Z @ x[t] + nu[t]`, where `nu ~ N(0, H)`.
"""
return self._sample_unconditional(
idata=idata,
group="prior",
steps=steps,
use_data_time_dim=use_data_time_dim,
random_seed=random_seed,
mvn_method=mvn_method,
**kwargs,
)
def sample_unconditional_posterior(
self,
idata: DataTree,
steps: int | None = None,
use_data_time_dim: bool = False,
random_seed: RandomState | None = None,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
**kwargs,
) -> DataTree:
"""
Draw unconditional sample trajectories according to state space dynamics, using random samples from the
posterior distribution over model parameters. The state space update equations are:
X[t+1] = T @ X[t] + R @ eta[t], eta ~ N(0, Q)
Y[t] = Z @ X[t] + nu[t], nu ~ N(0, H)
x[0] ~ N(a0, P0)
Parameters
----------
idata : DataTree
A DataTree object with a posterior group containing samples from the
posterior distribution over model parameters.
steps : Optional[int], default=None
The number of time steps to sample for the unconditional trajectories. If not provided (None),
the function will sample trajectories for the entire available time dimension in the posterior.
Otherwise, it will generate trajectories for the specified number of steps.
use_data_time_dim : bool, default=False
If True, the function uses the time dimension present in the provided `idata` object to sample
unconditional trajectories. If False, a custom time dimension is created based on the number of steps
specified, or if steps is None, it uses the entire available time dimension in the posterior.
random_seed : int, RandomState or Generator, optional
Seed for the random number generator.
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
Returns
-------
DataTree
An Arviz InfereceData with two groups, posterior_latent and posterior_observed
- posterior_latent represents the latent state trajectories `X[t]`, which follows the dynamics:
`x[t+1] = T @ x[t] + R @ eta[t]`, where `eta ~ N(0, Q)`.
- posterior_observed represents the observed state trajectories `Y[t]`, which is obtained from
the latent state trajectories: `y[t] = Z @ x[t] + nu[t]`, where `nu ~ N(0, H)`.
"""
return self._sample_unconditional(
idata=idata,
group="posterior",
steps=steps,
use_data_time_dim=use_data_time_dim,
random_seed=random_seed,
mvn_method=mvn_method,
**kwargs,
)
def sample_statespace_matrices(
self, idata, matrix_names: str | list[str] | None, group: str = "posterior", **kwargs
):
"""
Draw samples of requested statespace matrices from provided idata
Parameters
----------
matrix_names: str, list[str], optional
Statespace matrices to be sampled. Valid names are short names: x0, P0, c, d, T, Z, R, H, Q, or
"formal" names: initial_state, initial_state_cov, state_intercept, obs_intercept, transition, design,
selection, obs_cov, state_cov
idata: DataTree
DataTree from which to sample
group: str, one of "posterior" or "prior"
Whether to sample from priors or posteriors
kwargs:
Additional keyword arguments are passed to ``pymc.sample_posterior_predictive``
Returns
-------
idata_matrices: az.InterenceData
"""
_verify_group(group)
compile_kwargs = kwargs.pop("compile_kwargs", {})
compile_kwargs.setdefault("mode", self.mode)
if matrix_names is None:
matrix_names = MATRIX_NAMES
elif isinstance(matrix_names, str):
matrix_names = [matrix_names]
with pm.Model(coords=self._fit_coords) as forward_model:
self._build_dummy_graph()
self._insert_random_variables()
for name in self.data_names:
pm.Data(**self.data_info[name])
self._insert_data_variables()
matrices = self.unpack_statespace()
for short_name, matrix in zip(MATRIX_NAMES, matrices):
long_name = SHORT_NAME_TO_LONG[short_name]
if (long_name in matrix_names) or (short_name in matrix_names):
name = long_name if long_name in matrix_names else short_name
dims = [x if x in self._fit_coords else None for x in MATRIX_DIMS[short_name]]
pm.Deterministic(name, matrix, dims=dims)
# TODO: Remove this after pm.Flat has its initial_value fixed
forward_model.rvs_to_initial_values = {
rv: None for rv in forward_model.rvs_to_initial_values.keys()
}
frozen_model = freeze_dims_and_data(forward_model)
with frozen_model:
matrix_idata = pm.sample_posterior_predictive(
idata if group == "posterior" else idata["prior"],
var_names=matrix_names,
extend_inferencedata=False,
compile_kwargs=compile_kwargs,
**kwargs,
)
return matrix_idata
def sample_filter_outputs(
self,
idata,
filter_output_names: str | list[str] | None = None,
group: str = "posterior",
**kwargs,
):
if isinstance(filter_output_names, str):
filter_output_names = [filter_output_names]
if filter_output_names is None:
filter_output_names = list(FILTER_OUTPUT_DIMS.keys())
else:
unknown_filter_output_names = np.setdiff1d(
filter_output_names, list(FILTER_OUTPUT_DIMS.keys())
)
if unknown_filter_output_names.size > 0:
raise ValueError(f"{unknown_filter_output_names} not a valid filter output name!")
filter_output_names = [x for x in FILTER_OUTPUT_DIMS.keys() if x in filter_output_names]
compile_kwargs = kwargs.pop("compile_kwargs", {})
compile_kwargs.setdefault("mode", self.mode)
with pm.Model(coords=self.coords) as m:
self._build_dummy_graph()
self._insert_random_variables()
if self.data_names:
for name in self.data_names:
pm.Data(**self._fit_exog_data[name])
self._insert_data_variables()
x0, P0, c, d, T, Z, R, H, Q = self.unpack_statespace()
data = self._fit_data
obs_coords = m.coords.get(OBS_STATE_DIM, None)
data, nan_mask = register_data_with_pymc(
data,
n_obs=self.ssm.k_endog,
obs_coords=obs_coords,
register_data=True,
)
filter_outputs = self.kalman_filter.build_graph(
data,
x0,
P0,
c,
d,
T,
Z,
R,
H,
Q,
time_varying_names=self.ssm.time_varying_names,
)
smoother_outputs = self.kalman_smoother.build_graph(
T,
R,
Q,
filter_outputs[0],
filter_outputs[3],
time_varying_names=self.ssm.time_varying_names,
)
filter_outputs = filter_outputs[:-1] + list(smoother_outputs)
for output in filter_outputs:
if output.name in filter_output_names:
dims = FILTER_OUTPUT_DIMS[output.name]
pm.Deterministic(output.name, output, dims=dims)
with freeze_dims_and_data(m):
return pm.sample_posterior_predictive(
idata if group == "posterior" else idata["prior"],
var_names=filter_output_names,
compile_kwargs=compile_kwargs,
**kwargs,
)
@staticmethod
def _validate_forecast_args(
time_index: pd.RangeIndex | pd.DatetimeIndex,
start: int | pd.Timestamp,
periods: int | None = None,
end: int | pd.Timestamp = None,
scenario: pd.DataFrame | np.ndarray | None = None,
use_scenario_index: bool = False,
verbose: bool = True,
):
if isinstance(start, pd.Timestamp) and start not in time_index:
raise ValueError("Datetime start must be in the data index used to fit the model.")
elif isinstance(start, int):
if abs(start) > len(time_index):
raise ValueError(
"Integer start must be within the range of the data index used to fit the model."
)
if periods is None and end is None and not use_scenario_index:
raise ValueError(
"Must specify one of either periods or end unless use_scenario_index=True"
)
if periods is not None and end is not None:
raise ValueError("Must specify exactly one of either periods or end")
if scenario is None and use_scenario_index:
raise ValueError("use_scenario_index=True requires a scenario to be provided.")
if scenario is not None and use_scenario_index:
if isinstance(scenario, dict):
first_df = next(
(df for df in scenario.values() if isinstance(df, pd.DataFrame | pd.Series)),
None,
)
if first_df is None:
raise ValueError(
"use_scenario_index=True requires a scenario to be a DataFrame or Series."
)
elif not isinstance(scenario, pd.DataFrame | pd.Series):
raise ValueError(
"use_scenario_index=True requires a scenario to be a DataFrame or Series."
)
if use_scenario_index and any(arg is not None for arg in [start, end, periods]) and verbose:
_log.warning(
"start, end, and periods arguments are ignored when use_scenario_index is True. Pass only "
"one or the other to avoid this warning, or pass verbose = False."
)
def _get_fit_time_index(self) -> pd.RangeIndex | pd.DatetimeIndex:
time_index = self._fit_coords.get(TIME_DIM, None) if self._fit_coords is not None else None
if time_index is None:
raise ValueError(
"No time dimension found on coordinates used to fit the model. Has this model been fit?"
)
if isinstance(time_index[0], pd.Timestamp):
time_index = pd.DatetimeIndex(time_index)
time_index.freq = time_index.inferred_freq
else:
time_index = np.array(time_index)
return time_index
def _validate_scenario_data(
self,
scenario: pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None,
name: str | None = None,
verbose=True,
):
"""
Validate the scenario data provided to the forecast method by checking that it has the correct shape and
dimensions.
Parameters
----------
scenario
name
verbose
Returns
-------
scenario: pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray]
Scenario data, validated and potentially modified.
"""
if not self._needs_exog_data:
return scenario
var_to_dims = {key: info["dims"][1:] for key, info in self.data_info.items()}
if any(len(dims) > 1 for dims in var_to_dims.values()):
raise NotImplementedError(">2d exogenous data is not yet supported.")
coords = {
var: self._fit_coords[dim[0]]
for var, dim in var_to_dims.items()
if dim[0] in self._fit_coords
}
if self._needs_exog_data and scenario is None:
exog_str = ",".join(self.data_names)
suffix = "s" if len(exog_str) > 1 else ""
raise ValueError(
f"This model was fit using exogenous data. Forecasting cannot be performed without "
f"providing scenario data for the following variable{suffix}: {exog_str}"
)
if isinstance(scenario, dict):
for name, data in scenario.items():
if name not in self.data_names:
raise ValueError(
f"Scenario data provided for variable '{name}', which is not an exogenous variable "
f"used to fit the model."
)
# Recursively call this function to trigger the non-dictionary branch of the checks on each object
# inside the dictionary
scenario[name] = self._validate_scenario_data(data, name)
# The provided dictionary might be a mix of numpy arrays and dataframes if the user is truly horrible.
# For checking shapes, the first object will always be good enough. But we also need to make sure all the
# indices agree, so we grab the first dataframe (which might not exist, but that's OK)
first_scenario = next(iter(scenario.values()))
first_df = next((df for df in scenario.values() if isinstance(df, pd.DataFrame)), None)
if not all(data.shape[0] == first_scenario.shape[0] for data in scenario.values()):
raise ValueError(
"Scenario data must have the same number of time steps for all variables."
)
if first_df is not None and not all(
df.index.equals(first_df.index)
for df in scenario.values()
if isinstance(df, pd.DataFrame)
):
raise ValueError("Scenario data must have the same index for all variables.")
return scenario
elif isinstance(scenario, pd.Series | pd.DataFrame | np.ndarray | list | tuple):
# A user might be lazy and pass a simple list when there is only one exogenous variable.
if isinstance(scenario, list | tuple) or (
isinstance(scenario, np.ndarray) and scenario.ndim == 1
):
scenario = np.array(scenario).reshape(-1, 1)
if name is None:
# name should only be None on the first non-recursive call. We only arrive to this branch in that case
# if a non-dictionary was passed, which in turn should only happen if only a single exogenous data
# needs to be set.
if len(self.data_names) > 1:
raise ValueError(
"Multiple exogenous variables were used to fit the model. Provide a dictionary of "
"scenario data instead."
)
name = self.data_names[0]
# Omit dataframe from this basic shape check so we can give more detailed information about missing columns
# in the next check
if not isinstance(scenario, pd.DataFrame | pd.Series) and scenario.shape[1] != len(
coords[name]
):
raise ValueError(
f"Scenario data for variable '{name}' has the wrong number of columns. Expected "
f"{len(coords[name])}, got {scenario.shape[1]}"
)
if isinstance(scenario, pd.Series):
if len(coords[name]) > 1:
raise ValueError(
f"Scenario data for variable '{name}' has the wrong number of columns. Expected "
f"{len(coords[name])}, got 1"
)
if isinstance(scenario, pd.DataFrame):
expected_cols = coords[name]
cols = scenario.columns
missing_columns = sorted(list(set(expected_cols) - set(cols)))
if len(missing_columns) > 0:
suffix = "s" if len(missing_columns) > 1 else ""
raise ValueError(
f"Scenario data for variable '{name}' is missing the following column{suffix}: "
f"{', '.join(missing_columns)}"
)
extra_columns = sorted(list(set(cols) - set(expected_cols)))
if len(extra_columns) > 0:
suffix = "s" if len(extra_columns) > 1 else ""
verb = "is" if len(extra_columns) == 1 else "are"
raise ValueError(
f"Scenario data for variable '{name}' contains the following extra column{suffix} "
f"that {verb} not used by the model: "
f"{', '.join(extra_columns)}"
)
if not (a == b for a, b in zip(expected_cols, cols)) and verbose:
_log.warning(
f"Scenario data for {name} has a different column order than the data used to fit the "
f"model. Columns will be automatically re-ordered. Ensure consistent ordering to avoid "
f"silent errors."
)
scenario = scenario[expected_cols]
return scenario
@staticmethod
def _build_forecast_index(
time_index: pd.RangeIndex | pd.DatetimeIndex,
start: int | pd.Timestamp | None = None,
end: int | pd.Timestamp = None,
periods: int | None = None,
use_scenario_index: bool = False,
scenario: pd.DataFrame | np.ndarray | None = None,
) -> tuple[int | pd.Timestamp, pd.RangeIndex | pd.DatetimeIndex]:
"""
Construct a pandas Index for the requested forecast horizon.
Parameters
----------
time_index: pd.RangeIndex or pd.DatetimeIndex
Index of the data used to fit the model
start: int or pd.Timestamp, optional
Date from which to begin forecasting. If using a datetime index, integer start will be interpreted
as a positional index. Otherwise, start must be found inside the time_index
end: int or pd.Timestamp, optional
Date at which to end forecasting. If using a datetime index, end must be a timestamp.
periods: int, optional
Number of periods to forecast
scenario: pd.DataFrame, np.ndarray, optional
Scenario data to use for forecasting. If provided, the index of the scenario data will be used as the
forecast index. If provided, start, end, and periods will be ignored.
use_scenario_index: bool, default False
If True, the index of the scenario data will be used as the forecast index.
Returns
-------
start: int | pd.TimeStamp
The starting date index or time step from which to generate the forecasts.
forecast_index: pd.DatetimeIndex or pd.RangeIndex
Index for the forecast results
"""
def get_or_create_index(x, time_index, start=None):
if isinstance(x, pd.DataFrame | pd.Series):
return x.index
elif isinstance(x, dict):
return get_or_create_index(next(iter(x.values())), time_index, start)
elif isinstance(x, np.ndarray | list | tuple):
if start is None:
raise ValueError(
"Provided scenario has no index and no start date was provided. This combination "
"is ambiguous. Please provide a start date, or add an index to the scenario."
)
is_datetime_index = isinstance(time_index, pd.DatetimeIndex)
n = x.shape[0] if isinstance(x, np.ndarray) else len(x)
if isinstance(start, int):
start = time_index[start]
if is_datetime_index:
return pd.date_range(start, periods=n, freq=time_index.freq)
return pd.RangeIndex(start, n + start, step=1, dtype="int")
else:
raise ValueError(f"{type(x)} is not a valid type for scenario data.")
x0_idx = None
if use_scenario_index:
forecast_index = get_or_create_index(scenario, time_index, start)
is_datetime = isinstance(forecast_index, pd.DatetimeIndex)
# If the user provided an index, we want to take it as-is (without removing the start value). Instead,
# step one back and use this as the start value.
delta = forecast_index.freq if is_datetime else 1
x0_idx = forecast_index[0] - delta
else:
# Otherwise, build an index. It will be a DateTime index if we have all the necessary information, otherwise
# use a range index.
is_datetime = isinstance(time_index, pd.DatetimeIndex)
forecast_index = None
if is_datetime:
freq = time_index.freq
if isinstance(start, int):
start = time_index[start]
if isinstance(end, int):
raise ValueError(
"end must be a timestamp if using a datetime index. To specify a number of "
"timesteps from the start date, use the periods argument instead."
)
if end is not None:
forecast_index = pd.date_range(start, end=end, freq=freq)
if periods is not None:
# date_range includes both the start and end date, but we're going to pop off the start later
# (it will be interpreted as x0). So we need to add 1 to the periods so the user gets "periods"
# number of forecasts back
forecast_index = pd.date_range(start, periods=periods + 1, freq=freq)
else:
# If the user provided a positive integer as start, directly interpret it as the start time. If its
# negative, interpret it as a positional index.
if start < 0:
start = time_index[start]
if end is not None:
forecast_index = pd.RangeIndex(start, end, step=1, dtype="int")
if periods is not None:
forecast_index = pd.RangeIndex(start, start + periods + 1, step=1, dtype="int")
if is_datetime:
if forecast_index.freq != time_index.freq:
raise ValueError(
"The frequency of the forecast index must match the frequency on the data used "
f"to fit the model. Got {forecast_index.freq}, expected {time_index.freq}"
)
if x0_idx is None:
x0_idx, forecast_index = forecast_index[0], forecast_index[1:]
if x0_idx in forecast_index:
raise ValueError("x0_idx should not be in the forecast index")
if x0_idx not in time_index:
raise ValueError("start must be in the data index used to fit the model.")
# The starting value should not be included in the forecast index. It will be used only to define x0 and P0,
# and no forecast will be associated with it.
return x0_idx, forecast_index
def _finalize_scenario_initialization(
self,
scenario: pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None,
forecast_index: pd.RangeIndex | pd.DatetimeIndex,
name=None,
):
if not self.data_info:
return scenario
var_to_dims = {key: info["dims"][1:] for key, info in self.data_info.items()}
if any(len(dims) > 1 for dims in var_to_dims.values()):
raise NotImplementedError(">2d exogenous data is not yet supported.")
coords = {
var: self._fit_coords[dim[0]]
for var, dim in var_to_dims.items()
if dim[0] in self._fit_coords
}
if scenario is None:
return scenario
if isinstance(scenario, dict):
for name, data in scenario.items():
scenario[name] = self._finalize_scenario_initialization(data, forecast_index, name)
return scenario
# This was already checked as valid
name = self.data_names[0] if name is None else name
# Small tidying up in the case we just have a single scenario that's already a dataframe.
if isinstance(scenario, pd.DataFrame | pd.Series):
if isinstance(scenario, pd.Series):
scenario = scenario.to_frame(name=coords[name][0])
if not scenario.index.equals(forecast_index):
scenario.index = forecast_index
# lists and tuples were handled during validation, along with shape check, so just cast arrays to dataframes
# with the correct index and columns
if isinstance(scenario, np.ndarray):
scenario = pd.DataFrame(scenario, index=forecast_index, columns=coords[name])
return scenario
def _build_forecast_model(
self, time_index, t0, forecast_index, scenario, filter_output, mvn_method
):
filter_time_dim = TIME_DIM
temp_coords = self._fit_coords.copy()
dims = None
if all([dim in temp_coords for dim in [filter_time_dim, ALL_STATE_DIM, OBS_STATE_DIM]]):
dims = [TIME_DIM, ALL_STATE_DIM, OBS_STATE_DIM]
t0_idx = np.flatnonzero(time_index == t0)[0]
temp_coords["data_time"] = time_index
temp_coords[TIME_DIM] = forecast_index
mu_dims, cov_dims = None, None
if all([dim in self._fit_coords for dim in [TIME_DIM, ALL_STATE_DIM, ALL_STATE_AUX_DIM]]):
mu_dims = ["data_time", ALL_STATE_DIM]
cov_dims = ["data_time", ALL_STATE_DIM, ALL_STATE_AUX_DIM]
with pm.Model(coords=temp_coords) as forecast_model:
_, grouped_outputs = self._kalman_filter_outputs_from_dummy_graph(
data_dims=["data_time", OBS_STATE_DIM],
)
group_idx = FILTER_OUTPUT_TYPES.index(filter_output)
mu, cov = grouped_outputs[group_idx]
sub_dict = {
data_var: pt.as_tensor_variable(data_var.get_value(), name="data")
for data_var in forecast_model.data_vars
}
missing_data_vars = np.setdiff1d(
ar1=[*self.data_names, "data"], ar2=[k.name for k, _ in sub_dict.items()]
)
if missing_data_vars.size > 0:
raise ValueError(f"{missing_data_vars} data used for fitting not found!")
mu_frozen, cov_frozen = graph_replace([mu, cov], replace=sub_dict, strict=True)
x0 = pm.Deterministic(
"x0_slice", mu_frozen[t0_idx], dims=mu_dims[1:] if mu_dims is not None else None
)
P0 = pm.Deterministic(
"P0_slice", cov_frozen[t0_idx], dims=cov_dims[1:] if cov_dims is not None else None
)
# Get fresh matrices with n_timesteps placeholder still intact.
# Build for the full timeline (training + forecast) so that time-varying matrices
# continue at the correct phase, then slice to keep only the forecast portion.
n_train = len(time_index)
n_total = n_train + len(forecast_index)
full_matrices = self._insert_constant_timestep(self.unpack_statespace(), n_total)
_, _, *forecast_matrices = full_matrices
# For exogenous-data-driven matrices the time dimension comes from the
# data shared variable, not from the n_timesteps symbolic. Replace the
# shared variables with concatenated training + scenario tensors so the
# [n_train:] slice below yields the correct forecast portion.
# TODO: Is there a way to handle this in a fully symbolic way, without having to
# run the full scan on training data to get the system's state at the start date?
if scenario is not None and self._needs_exog_data:
exog_replace = {}
for name in self.data_names:
if name not in scenario:
continue
forecast_data = scenario[name]
train_val = self._fit_exog_data[name]["value"]
fc_val = (
forecast_data.values
if isinstance(forecast_data, pd.DataFrame)
else np.asarray(forecast_data)
)
combined = np.concatenate([train_val, fc_val], axis=0)
exog_replace[forecast_model[name]] = pt.as_tensor_variable(combined, name=name)
if exog_replace:
forecast_matrices = graph_replace(
forecast_matrices, replace=exog_replace, strict=False
)
forecast_names = MATRIX_NAMES[2:] # c, d, T, Z, R, H, Q
time_varying_names = self.ssm.time_varying_names
forecast_matrices = [
m[n_train:] if SHORT_NAME_TO_LONG[name] in time_varying_names else m
for m, name in zip(forecast_matrices, forecast_names)
]
_ = LinearGaussianStateSpace(
"forecast",
x0,
P0,
*forecast_matrices,
steps=len(forecast_index),
dims=dims,
sequence_names=self.kalman_filter.seq_names,
k_endog=self.k_endog,
append_x0=False,
method=mvn_method,
)
return forecast_model
def forecast(
self,
idata: DataTree,
start: int | pd.Timestamp | None = None,
periods: int | None = None,
end: int | pd.Timestamp = None,
scenario: pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None = None,
use_scenario_index: bool = False,
filter_output="smoothed",
random_seed: RandomState | None = None,
verbose: bool = True,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
**kwargs,
) -> DataTree:
"""
Generate forecasts of state space model trajectories into the future.
This function combines posterior parameter samples in the provided idata with model dynamics to generate
forecasts for out-of-sample data. The trajectory is initialized using the filter output specified in
the filter_output argument.
Parameters
----------
idata : DataTree
A DataTree object containing the posterior distribution over model parameters.
start : int or pd.Timestamp, optional
The starting date index or time step from which to generate the forecasts. If the data provided to
`PyMCStateSpace.build_statespace_graph` had a datetime index, `start` should be a datetime.
If using integer time series, `start` should be an integer indicating the starting time step. In either
case, `start` should be in the data index used to build the statespace graph.
If start is None, the last value on the data's index will be used.
periods : int, optional
The number of time steps to forecast into the future. If `periods` is specified, the `end`
parameter will be ignored. If `None`, then the `end` parameter must be provided.
end : int or pd.Timestamp, optional
The ending date index or time step up to which to generate the forecasts. If the data provided to
`PyMCStateSpace.build_statespace_graph` had a datetime index, `start` should be a datetime.
If using integer time series, `end` should be an integer indicating the ending time step.
If `end` is provided, the `periods` parameter will be ignored.
scenario: pd.Dataframe or np.ndarray, optional
Exogenous variables to use for scenario-based forecasting. Must be a 2d array-like, with second dimension
equal to the number of exogenous variables. If start, end, or periods are specified, the first dimension
must conform with these settings. Otherwise, the index of the scenario data will be used to set the
number of forecast steps. If the index of the forecast scenairo is a pandas DateTimeIndex, its frequency
must match the frequency of the data used to fit the model. Otherwise, dates will be based on the number
of forecast steps and the data.
use_scenario_index: bool, default False
If True, the index of the scenario data will be used to determine the forecast period. In this case,
the start, end, and periods arguments will be ignored. If True, the scenario data must be a DataFrame,
otherwise an error will be raised.
filter_output : str, default="smoothed"
The type of Kalman Filter output used to initialize the forecasts. The 0th timestep of the forecast will
be sampled from x[0] ~ N(filter_output_mean[start], filter_output_covariance[start]). Default is "smoothed",
which uses past and future data to make the best possible hidden state estimate.
random_seed : int, RandomState or Generator, optional
Seed for the random number generator.
verbose: bool, default=True
Whether to print diagnostic information about forecasting.
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
kwargs:
Additional keyword arguments are passed to pymc.sample_posterior_predictive
Returns
-------
DataTree
A DataTree object containing forecast samples for the latent and observed state
trajectories of the state space model, named "forecast_latent" and "forecast_observed".
- forecast_latent represents the latent state trajectories `X[t]`, which follows the dynamics:
`x[t+1] = T @ x[t] + R @ eta[t]`, where `eta ~ N(0, Q)`.
- forecast_observed represents the observed state trajectories `Y[t]`, which is obtained from
the latent state trajectories: `y[t] = Z @ x[t] + nu[t]`, where `nu ~ N(0, H)`.
"""
_validate_filter_arg(filter_output)
compile_kwargs = kwargs.pop("compile_kwargs", {})
compile_kwargs.setdefault("mode", self.mode)
time_index = self._get_fit_time_index()
if start is None and verbose:
_log.warning(
"No start date provided. Using the last date in the data index. To silence this warning, "
"explicitly pass a start date or set verbose = False"
)
start = time_index[-1]
if self._needs_exog_data and not isinstance(scenario, dict):
if len(self.data_names) > 1:
raise ValueError(
"Model needs more than one exogenous data to do forecasting. In this case, you must "
"pass a dictionary of scenario data."
)
[data_name] = self.data_names
scenario = {data_name: scenario}
scenario: dict = self._validate_scenario_data(scenario, verbose=verbose)
self._validate_forecast_args(
time_index=time_index,
start=start,
end=end,
periods=periods,
scenario=scenario,
use_scenario_index=use_scenario_index,
verbose=verbose,
)
t0, forecast_index = self._build_forecast_index(
time_index=time_index,
start=start,
end=end,
periods=periods,
scenario=scenario,
use_scenario_index=use_scenario_index,
)
scenario = self._finalize_scenario_initialization(scenario, forecast_index)
forecast_model = self._build_forecast_model(
time_index=time_index,
t0=t0,
forecast_index=forecast_index,
scenario=scenario,
filter_output=filter_output,
mvn_method=mvn_method,
)
with forecast_model:
if scenario is not None:
dummy_obs_data = np.zeros((len(forecast_index), self.k_endog))
pm.set_data(
scenario | {"data": dummy_obs_data},
coords={"data_time": np.arange(len(forecast_index))},
)
forecast_model.rvs_to_initial_values = {
k: None for k in forecast_model.rvs_to_initial_values.keys()
}
frozen_model = freeze_dims_and_data(forecast_model)
with frozen_model:
idata_forecast = pm.sample_posterior_predictive(
idata,
var_names=["forecast_latent", "forecast_observed"],
random_seed=random_seed,
compile_kwargs=compile_kwargs,
**kwargs,
)
return idata_forecast.posterior_predictive
def impulse_response_function(
self,
idata,
n_steps: int = 40,
use_posterior_cov: bool = True,
shock_size: float | np.ndarray | None = None,
shock_cov: np.ndarray | None = None,
shock_trajectory: np.ndarray | None = None,
orthogonalize_shocks: bool = False,
random_seed: RandomState | None = None,
mvn_method: Literal["cholesky", "eigh", "svd"] = "svd",
**kwargs,
):
"""
Generate impulse response functions (IRF) from state space model dynamics.
An impulse response function represents the dynamic response of the state space model
to an instantaneous shock applied to the system. This function calculates the IRF
based on either provided shock specifications or the posterior state covariance matrix.
Parameters
----------
idata : DataTree
A DataTree object containing the posterior distribution over model parameters.
n_steps: int
The number of time steps to calculate the impulse response. Default is 40.
If `shock_trajectory` is provided, the length of the shock trajectory will override this value.
use_posterior_cov: bool, default=True
Whether to use the covariance matrix of the posterior distribution to generate the impulse response.
Only one of `use_posterior_cov`, `shock_cov`, `shock_size`, or `shock_trajectory` can be specified.
shock_size : Optional[Union[float, np.ndarray]], default=None
The size of the shock applied to the system. If specified, it will create a covariance
matrix for the shock with diagonal elements equal to `shock_size`. If float, the diagonal will be filled
with `shock_size`. If an array, `shock_size` must match the number of shocks in the state space model.
Only one of `use_posterior_cov`, `shock_cov`, `shock_size`, or `shock_trajectory` can be specified.
shock_cov : Optional[np.ndarray], default=None
A user-specified covariance matrix for the shocks. It should be a 2D numpy array with
dimensions (n_shocks, n_shocks), where n_shocks is the number of shocks in the state space model.
Only one of `use_posterior_cov`, `shock_cov`, `shock_size`, or `shock_trajectory` can be specified.
shock_trajectory : Optional[np.ndarray], default=None
A pre-defined trajectory of shocks applied to the system. It should be a 2D numpy array
with dimensions (n, n_shocks), where n is the number of time steps and k_posdef is the
number of shocks in the state space model.
Only one of `use_posterior_cov`, `shock_cov`, `shock_size`, or `shock_trajectory` can be specified.
orthogonalize_shocks : bool, default=False
If True, orthogonalize the shocks using Cholesky decomposition when generating the impulse
response. This option is ignored if `shock_trajectory` or `shock_size` are used.
random_seed : int, RandomState or Generator, optional
Seed for the random number generator.
mvn_method: str, default "svd"
Method used to invert the covariance matrix when calculating the pdf of a multivariate normal
(or when generating samples). One of "cholesky", "eigh", or "svd". "cholesky" is fastest, but least robust
to ill-conditioned matrices, while "svd" is slow but extremely robust.
In general, if your model has measurement error, "cholesky" will be safe to use. Otherwise, "svd" is
recommended. "eigh" can also be tried if sampling with "svd" is very slow, but it is not as robust as "svd".
kwargs:
Additional keyword arguments are passed to pymc.sample_posterior_predictive
Returns
-------
DataTree
A DataTree object containing impulse response function in a variable named "irf".
Notes
-----
For models with time-varying transition matrices, the IRF is computed starting at phase 0 of the
time-varying cycle. This means the response represents the effect of a shock occurring at the first
modeled state, T(0).
"""
options = [shock_size, shock_cov, shock_trajectory]
n_options = sum(x is not None for x in options)
Q = None # No covariance matrix needed if a trajectory is provided. Will be overwritten later if needed.
compile_kwargs = kwargs.pop("compile_kwargs", {})
compile_kwargs.setdefault("mode", self.mode)
if n_options > 1:
raise ValueError("Specify exactly 0 or 1 of shock_size, shock_cov, or shock_trajectory")
elif n_options == 1:
# If the user passed an alternative parameterization for the shocks of the IRF, don't use the posterior
use_posterior_cov = False
if shock_trajectory is not None:
# Validate the shock trajectory
n, k = shock_trajectory.shape
steps = n
if k != self.k_posdef:
raise ValueError(
"If shock_trajectory is provided, there must be a trajectory provided for each shock. "
f"Model has {self.k_posdef} shocks, but shock_trajectory has only {k} columns"
)
if steps is not None and steps != n:
_log.warning(
"Both steps and shock_trajectory were provided but do not agree. Length of "
"shock_trajectory will take priority, and steps will be ignored."
)
n_steps = n # Overwrite steps with the length of the shock trajectory
shock_trajectory = pt.as_tensor_variable(shock_trajectory)
simulation_coords = self._fit_coords.copy()
simulation_coords[TIME_DIM] = np.arange(n_steps, dtype="int")
with pm.Model(coords=simulation_coords):
self._build_dummy_graph()
self._insert_random_variables()
matrices = self._insert_constant_timestep(self.unpack_statespace(), step=n_steps)
P0, _, c, d, T, Z, R, H, post_Q = matrices
x0 = pm.Deterministic("x0_new", pt.zeros(self.k_states), dims=[ALL_STATE_DIM])
if use_posterior_cov:
Q = post_Q
if orthogonalize_shocks:
Q = pt.linalg.cholesky(Q) / pt.diag(Q)
elif shock_cov is not None:
Q = pt.as_tensor_variable(shock_cov)
if orthogonalize_shocks:
Q = pt.linalg.cholesky(Q) / pt.diag(Q)
if shock_trajectory is None:
shock_trajectory = pt.zeros((n_steps, self.k_posdef))
if Q is not None:
init_shock = pm.MvNormal(
"initial_shock", mu=0, cov=Q, dims=[SHOCK_DIM], method=mvn_method
)
else:
init_shock = pm.Deterministic(
"initial_shock",
pt.as_tensor_variable(np.atleast_1d(shock_size)),
dims=[SHOCK_DIM],
)
shock_trajectory = pt.set_subtensor(shock_trajectory[0], init_shock)
else:
shock_trajectory = pt.as_tensor_variable(shock_trajectory)
time_varying_T = "transition" in self.ssm.time_varying_names
def irf_step(*args):
if time_varying_T:
shock, T, x, c, R = args
else:
shock, x, c, T, R = args
next_x = c + T @ x + R @ shock
return next_x
sequences = [shock_trajectory, T] if time_varying_T else [shock_trajectory]
non_sequences = [c, R] if time_varying_T else [c, T, R]
irf = pytensor.scan(
irf_step,
sequences=sequences,
outputs_info=[x0],
non_sequences=non_sequences,
n_steps=n_steps,
strict=True,
return_updates=False,
)
pm.Deterministic("irf", irf, dims=[TIME_DIM, ALL_STATE_DIM])
irf_idata = pm.sample_posterior_predictive(
idata,
var_names=["irf"],
random_seed=random_seed,
compile_kwargs=compile_kwargs,
**kwargs,
)
return irf_idata["posterior_predictive"]
def _sort_obs_inputs_by_time_varying(self, d, Z):
seqs = []
non_seqs = []
for matrix, name in zip([d, Z], ["d", "Z"]):
if name in self.kalman_filter.seq_names:
seqs.append(matrix)
else:
non_seqs.append(matrix)
return seqs, non_seqs
@staticmethod
def _sort_obs_scan_args(args):
args = list(args)
# If a matrix is time-varying, pytensor will put a [t] on the name
arg_names = [x.name.replace("[t]", "") for x in args]
ordered_args = []
for name in ["d", "Z"]:
idx = arg_names.index(name)
ordered_args.append(args[idx])
return ordered_args