Source code for pymc_extras.statespace.models.structural.components.autoregressive

import warnings

import numpy as np
import pytensor.tensor as pt

from pymc_extras.statespace.core.properties import (
    Coord,
    Parameter,
    Shock,
    State,
)
from pymc_extras.statespace.models.structural.core import Component
from pymc_extras.statespace.models.structural.utils import order_to_mask


[docs] class Autoregressive(Component): r""" Autoregressive timeseries component Parameters ---------- order: int or sequence of int If int, the number of lags to include in the model. If a sequence, an array-like of zeros and ones indicating which lags to include in the model. name: str, default "auto_regressive" A name for this autoregressive component. Used to label dimensions and coordinates. observed_state_names: list[str] | None, default None List of strings for observed state labels. If None, defaults to ["data"]. share_states: bool, default False Whether latent states are shared across the observed states. If True, there will be only one set of latent states, which are observed by all observed states. If False, each observed state has its own set of latent states. This argument has no effect if `k_endog` is 1. Notes ----- An autoregressive component can be thought of as a way o introducing serially correlated errors into the model. The process is modeled: .. math:: x_t = \sum_{i=1}^p \rho_i x_{t-i} Where ``p``, the number of autoregressive terms to model, is the order of the process. By default, all lags up to ``p`` are included in the model. To disable lags, pass a list of zeros and ones to the ``order`` argumnet. For example, ``order=[1, 1, 0, 1]`` would become: .. math:: x_t = \rho_1 x_{t-1} + \rho_2 x_{t-1} + \rho_4 x_{t-1} The coefficient :math:`\rho_3` has been constrained to zero. .. warning:: This class is meant to be used as a component in a structural time series model. For modeling of stationary processes with ARIMA, use ``statespace.BayesianSARIMAX``. Examples -------- Model a timeseries as an AR(2) process with non-zero mean: .. code:: python from pymc_extras.statespace import structural as st import pymc as pm import pytensor.tensor as pt trend = st.LevelTrend(order=1, innovations_order=0) ar = st.Autoregressive(2) ss_mod = (trend + ar).build() with pm.Model(coords=ss_mod.coords) as model: P0 = pm.Deterministic('P0', pt.eye(ss_mod.k_states) * 10, dims=ss_mod.param_dims['P0']) intitial_trend = pm.Normal('initial_trend', sigma=10, dims=ss_mod.param_dims['initial_trend']) ar_params = pm.Normal('ar_params', dims=ss_mod.param_dims['ar_params']) sigma_ar = pm.Exponential('sigma_ar', 1, dims=ss_mod.param_dims['sigma_ar']) ss_mod.build_statespace_graph(data) idata = pm.sample(nuts_sampler='numpyro') """
[docs] def __init__( self, order: int = 1, name: str = "auto_regressive", observed_state_names: list[str] | None = None, share_states: bool = False, ): if observed_state_names is None: observed_state_names = ["data"] k_endog = len(observed_state_names) k_endog_effective = k_posdef = 1 if share_states else k_endog order = order_to_mask(order) ar_lags = np.flatnonzero(order).ravel().astype(int) + 1 k_states = len(order) self.share_states = share_states self.order = order self.ar_lags = ar_lags state_names = [f"L{i + 1}_{name}" for i in range(k_states)] super().__init__( name=name, k_endog=k_endog, k_states=k_states * k_endog_effective, k_posdef=k_posdef, measurement_error=True, combine_hidden_states=True, base_observed_state_names=observed_state_names, base_state_names=state_names, obs_state_idxs=np.tile(np.r_[[1.0], np.zeros(k_states - 1)], k_endog_effective), share_states=share_states, )
def set_states(self) -> State | tuple[State, ...] | None: base_names = self.base_state_names observed_state_names = self.base_observed_state_names if self.share_states: state_names = [f"{name}[shared]" for name in base_names] else: state_names = [ f"{name}[{state_name}]" for state_name in observed_state_names for name in base_names ] hidden_states = [State(name=name, observed=False, shared=True) for name in state_names] observed_states = [ State(name=name, observed=True, shared=False) for name in observed_state_names ] return *hidden_states, *observed_states def set_parameters(self) -> Parameter | tuple[Parameter, ...] | None: k_endog = self.k_endog k_endog_effective = 1 if self.share_states else k_endog k_states = self.k_states // k_endog_effective # this is also the number of AR lags ar_param = Parameter( name=f"params_{self.name}", shape=(k_endog_effective, k_states) if k_endog_effective > 1 else (k_states,), dims=(f"lag_{self.name}",) if k_endog_effective == 1 else ( f"endog_{self.name}", f"lag_{self.name}", ), constraints=None, ) sigma_param = Parameter( name=f"sigma_{self.name}", shape=(k_endog_effective,) if k_endog_effective > 1 else (), dims=(f"endog_{self.name}",) if k_endog_effective > 1 else None, constraints="Positive", ) return ar_param, sigma_param def set_shocks(self) -> Shock | tuple[Shock, ...] | None: observed_state_names = self.observed_state_names if self.share_states: shock_names = [f"{self.name}[shared]"] else: shock_names = [f"{self.name}[{obs_name}]" for obs_name in observed_state_names] return tuple(Shock(name=name) for name in shock_names) def set_coords(self) -> Coord | tuple[Coord, ...] | None: k_endog = self.k_endog k_endog_effective = 1 if self.share_states else k_endog observed_state_names = self.observed_state_names lag_coord = Coord(dimension=f"lag_{self.name}", labels=self.ar_lags.tolist()) coord_container = [lag_coord] if k_endog_effective > 1: endog_coord = Coord(dimension=f"endog_{self.name}", labels=observed_state_names) coord_container.append(endog_coord) return tuple(coord_container) def make_symbolic_graph(self) -> None: k_endog = self.k_endog k_endog_effective = 1 if self.share_states else k_endog k_states = self.k_states // k_endog_effective k_posdef = self.k_posdef k_nonzero = int(sum(self.order)) ar_params = self.make_and_register_variable( f"params_{self.name}", shape=(k_nonzero,) if k_endog_effective == 1 else (k_endog_effective, k_nonzero), ) sigma_ar = self.make_and_register_variable( f"sigma_{self.name}", shape=() if k_endog_effective == 1 else (k_endog_effective,) ) if k_endog_effective == 1: T = pt.eye(k_states, k=-1) ar_idx = (np.zeros(k_nonzero, dtype="int"), np.nonzero(self.order)[0]) T = T[ar_idx].set(ar_params) else: transition_matrices = [] for i in range(k_endog_effective): T = pt.eye(k_states, k=-1) ar_idx = (np.zeros(k_nonzero, dtype="int"), np.nonzero(self.order)[0]) T = T[ar_idx].set(ar_params[i]) transition_matrices.append(T) T = pt.linalg.block_diag(*transition_matrices) self.ssm["transition", :, :] = T R = np.eye(k_states) R_mask = np.full((k_states,), False) R_mask[0] = True R = R[:, R_mask] self.ssm["selection", :, :] = pt.linalg.block_diag(*[R for _ in range(k_endog_effective)]) Zs = [pt.zeros((1, k_states))[0, 0].set(1.0) for _ in range(k_endog)] if self.share_states: Z = pt.join(0, *Zs) else: Z = pt.linalg.block_diag(*Zs) self.ssm["design", :, :] = Z cov_idx = ("state_cov", *np.diag_indices(k_posdef)) self.ssm[cov_idx] = sigma_ar**2
def __getattr__(name: str): if name == "AutoregressiveComponent": warnings.warn( "AutoregressiveComponent is deprecated and will be removed in a future release. " "Use Autoregressive instead.", FutureWarning, stacklevel=2, ) return Autoregressive raise AttributeError(f"module {__name__!r} has no attribute {name!r}")