import numpy as np
import pytensor
import pytensor.tensor as pt
from pytensor.compile.mode import Mode
from pytensor.tensor.linalg import solve_discrete_lyapunov
from pymc_extras.statespace.core.properties import (
Coord,
Data,
Parameter,
Shock,
State,
)
from pymc_extras.statespace.core.statespace import PyMCStateSpace
from pymc_extras.statespace.models.utilities import validate_names
from pymc_extras.statespace.utils.constants import (
ALL_STATE_AUX_DIM,
ALL_STATE_DIM,
AR_PARAM_DIM,
EXOG_STATE_DIM,
MA_PARAM_DIM,
OBS_STATE_AUX_DIM,
OBS_STATE_DIM,
SHOCK_AUX_DIM,
SHOCK_DIM,
TIME_DIM,
)
floatX = pytensor.config.floatX
[docs]
class BayesianVARMAX(PyMCStateSpace):
r"""
Vector AutoRegressive Moving Average with eXogenous Regressors
The VARMA model is a multivariate extension of the SARIMAX model. Given a set of timeseries :math:`\{x_t\}_{t=0}^T`,
with :math:`x_t = \begin{bmatrix} x_{1,t} & x_{2,t} & \cdots & x_{k,t} \end{bmatrix}^T`, a VARMA models each series
as a function of the histories of all series. Specifically, denoting the AR-MA order as (p, q), a VARMA can be
written:
.. math::
x_t = A_1 x_{t-1} + A_2 x_{t-2} + \cdots + A_p x_{t-p} + B_1 \varepsilon_{t-1} + \cdots
+ B_q \varepsilon_{t-q} + \varepsilon_t
Where :math:`\varepsilon_t = \begin{bmatrix} \varepsilon_{1,t} & \varepsilon_{2,t} & \cdots &
\varepsilon_{k,t}\end{bmatrix}^T \sim N(0, \Sigma)` is a vector of i.i.d stochastic innovations or shocks that drive
intertemporal variation in the data. Matrices :math:`A_i, B_i` are :math:`k \times k` coefficient matrices:
.. math::
A_i = \begin{bmatrix} \rho_{1,i,1} & \rho_{1,i,2} & \cdots & \rho_{1,i,k} \\
\rho_{2,i,1} & \rho_{2,i,2} & \cdots & \rho_{2,i,k} \\
\vdots & \vdots & \cdots & \vdots \\
\rho{k,i,1} & \rho_{k,i,2} & \cdots & rho_{k,i,k} \end{bmatrix}
Internally, this representation is not used. Instead, the vectors :math:`x_t, x_{t-1}, \cdots, x_{t-p},
\varepsilon_{t-1}, \cdots, \varepsilon_{t-q}` are concatenated into a single column vector of length ``k * (p+q)``,
while the coefficients matrices are likewise concatenated into a single coefficient matrix, :math:`T`.
As the dimensionality of the VARMA system increases -- either because there are a large number of timeseries
included in the analysis, or because the order is large -- the probability of sampling a stationary matrix :math:`T`
goes to zero. This has two implications for applied work. First, a non-stationary system will exhibit explosive
behavior, potentially rending impulse response functions and long-term forecasts useless. Secondly, it is not
possible to do stationary initialization. Stationary initialization significantly speeds up sampling, and should be
preferred when possible.
Examples
--------
The following code snippet estimates a VARMA(1, 1):
.. code:: python
import pymc_extras.statespace as pmss
import pymc as pm
# Create VAR Statespace Model
bvar_mod = pmss.BayesianVARMAX(endog_names=data.columns, order=(2, 0),
stationary_initialization=False, measurement_error=False,
filter_type="standard", verbose=True)
# Unpack dims and coords
x0_dims, P0_dims, state_cov_dims, ar_dims = bvar_mod.param_dims.values()
coords = bvar_mod.coords
# Estimate PyMC model
with pm.Model(coords=coords) as var_mod:
x0 = pm.Normal("x0", dims=x0_dims)
P0_diag = pm.Gamma("P0_diag", alpha=2, beta=1, size=data.shape[1] * 2, dims=P0_dims[0])
P0 = pm.Deterministic("P0", pt.diag(P0_diag), dims=P0_dims)
state_chol, _, _ = pm.LKJCholeskyCov(
"state_chol", eta=1, n=bvar_mod.k_posdef, sd_dist=pm.Exponential.dist(lam=1)
)
ar_params = pm.Normal("ar_params", mu=0, sigma=1, dims=ar_dims)
state_cov = pm.Deterministic("state_cov", state_chol @ state_chol.T, dims=state_cov_dims)
bvar_mod.build_statespace_graph(data)
idata = pm.sample(nuts_sampler="numpyro")
"""
[docs]
def __init__(
self,
order: tuple[int, int],
endog_names: list[str] | None = None,
exog_state_names: list[str] | dict[str, list[str]] | None = None,
stationary_initialization: bool = False,
filter_type: str = "standard",
measurement_error: bool = False,
verbose: bool = True,
mode: str | Mode | None = None,
):
"""
Create a Bayesian VARMAX model.
Parameters
----------
order: tuple of (int, int)
Number of autoregressive (AR) and moving average (MA) terms to include in the model. All terms up to the
specified order are included. For restricted models, set zeros directly on the priors.
endog_names: list of str, optional
Names of the endogenous variables being modeled. Used to generate names for the state and shock coords.
exog_state_names : list[str] or dict[str, list[str]], optional
Names of the exogenous state variables. If a list, all endogenous variables will share the same exogenous
variables. If a dict, keys should be the names of the endogenous variables, and values should be lists of the
exogenous variable names for that endogenous variable. Endogenous variables not included in the dict will
be assumed to have no exogenous variables. If None, no exogenous variables will be included.
stationary_initialization: bool, default False
If true, the initial state and initial state covariance will not be assigned priors. Instead, their steady
state values will be used. If False, the user is responsible for setting priors on the initial state and
initial covariance.
..warning :: This option is very sensitive to the priors placed on the AR and MA parameters. If the model dynamics
for a given sample are not stationary, sampling will fail with a "covariance is not positive semi-definite"
error.
filter_type: str, default "standard"
The type of Kalman Filter to use. Options are "standard", "single", "univariate", "steady_state",
and "cholesky". See the docs for kalman filters for more details.
measurement_error: bool, default True
If true, a measurement error term is added to the model.
verbose: bool, default True
If true, a message will be logged to the terminal explaining the variable names, dimensions, and supports.
mode: str or Mode, optional
Pytensor compile mode, used in auxiliary sampling methods such as ``sample_conditional_posterior`` and
``forecast``. The mode does **not** effect calls to ``pm.sample``.
Regardless of whether a mode is specified, it can always be overwritten via the ``compile_kwargs`` argument
to all sampling methods.
"""
validate_names(endog_names, var_name="endog_names", optional=False)
k_endog = len(endog_names)
needs_exog_data = False
if exog_state_names is not None and not isinstance(exog_state_names, list | dict):
raise ValueError("If not None, exog_state_names must be either a list or a dict")
if exog_state_names is not None:
if isinstance(exog_state_names, list):
k_exog = len(exog_state_names)
elif isinstance(exog_state_names, dict):
k_exog = {name: len(names) for name, names in exog_state_names.items()}
needs_exog_data = True
else:
k_exog = None
# If exog_state_names is a dict but 1) all endog variables are among the keys, and 2) all values are the same
# then we can drop back to the list case.
if (
isinstance(exog_state_names, dict)
and set(exog_state_names.keys()) == set(endog_names)
and len({frozenset(val) for val in exog_state_names.values()}) == 1
):
exog_state_names = exog_state_names[endog_names[0]]
k_exog = len(exog_state_names)
self.endog_names = list(endog_names)
self.exog_state_names = exog_state_names
self.k_exog = k_exog
self.p, self.q = order
self.stationary_initialization = stationary_initialization
k_order = max(self.p, 1) + self.q
k_states = int(k_endog * k_order)
k_posdef = k_endog
super().__init__(
k_endog,
k_states,
k_posdef,
filter_type,
verbose=verbose,
measurement_error=measurement_error,
mode=mode,
)
self._needs_exog_data = needs_exog_data
# Save counts of the number of parameters in each category
self.param_counts = {
"x0": k_states * (1 - self.stationary_initialization),
"P0": k_states**2 * (1 - self.stationary_initialization),
"AR": k_endog**2 * self.p,
"MA": k_endog**2 * self.q,
"state_cov": k_posdef**2,
"sigma_obs": k_endog * self.measurement_error,
}
def set_parameters(self) -> Parameter | tuple[Parameter, ...] | None:
k_endog = self.k_endog
k_states = self.k_states
k_posdef = self.k_posdef
parameters = []
if not self.stationary_initialization:
parameters.append(
Parameter(
name="x0",
shape=(k_states,),
dims=(ALL_STATE_DIM,),
constraints=None,
)
)
parameters.append(
Parameter(
name="P0",
shape=(k_states, k_states),
dims=(ALL_STATE_DIM, ALL_STATE_AUX_DIM),
constraints="Positive Semi-definite",
)
)
if self.p > 0:
parameters.append(
Parameter(
name="ar_params",
shape=(k_endog, self.p, k_endog),
dims=(OBS_STATE_DIM, AR_PARAM_DIM, OBS_STATE_AUX_DIM),
constraints=None,
)
)
if self.q > 0:
parameters.append(
Parameter(
name="ma_params",
shape=(k_endog, self.q, k_endog),
dims=(OBS_STATE_DIM, MA_PARAM_DIM, OBS_STATE_AUX_DIM),
constraints=None,
)
)
parameters.append(
Parameter(
name="state_cov",
shape=(k_posdef, k_posdef),
dims=(SHOCK_DIM, SHOCK_AUX_DIM),
constraints="Positive Semi-definite",
)
)
if self.measurement_error:
parameters.append(
Parameter(
name="sigma_obs",
shape=(k_endog,),
dims=(OBS_STATE_DIM,),
constraints="Positive",
)
)
# Handle exogenous parameters
if isinstance(self.exog_state_names, list):
k_exog = len(self.exog_state_names)
parameters.append(
Parameter(
name="beta_exog",
shape=(k_endog, k_exog),
dims=(OBS_STATE_DIM, EXOG_STATE_DIM),
constraints=None,
)
)
elif isinstance(self.exog_state_names, dict):
for name, exog_names in self.exog_state_names.items():
k_exog = len(exog_names)
parameters.append(
Parameter(
name=f"beta_{name}",
shape=(k_exog,),
dims=(f"{EXOG_STATE_DIM}_{name}",),
constraints=None,
)
)
return tuple(parameters)
def set_states(self) -> State | tuple[State, ...] | None:
state_names = self.endog_names.copy()
state_names += [
f"L{i + 1}_{state}" for i in range(self.p - 1) for state in self.endog_names
]
state_names += [
f"L{i + 1}_{state}_innov" for i in range(self.q) for state in self.endog_names
]
hidden_states = [State(name=name, observed=False) for name in state_names]
# The first k_endog states are observed
observed_states = [State(name=name, observed=True) for name in self.endog_names]
return *hidden_states, *observed_states
def set_shocks(self) -> Shock | tuple[Shock, ...] | None:
return tuple(Shock(name=name) for name in self.endog_names)
def set_data_info(self) -> tuple[Data, ...] | None:
data = []
if isinstance(self.exog_state_names, list):
k_exog = len(self.exog_state_names)
data.append(
Data(
name="exogenous_data",
shape=(None, k_exog),
dims=(TIME_DIM, EXOG_STATE_DIM),
is_exogenous=True,
)
)
elif isinstance(self.exog_state_names, dict):
for endog_state, exog_names in self.exog_state_names.items():
k_exog = len(exog_names)
data.append(
Data(
name=f"{endog_state}_exogenous_data",
shape=(None, k_exog),
dims=(TIME_DIM, f"{EXOG_STATE_DIM}_{endog_state}"),
is_exogenous=True,
)
)
return tuple(data)
def set_coords(self) -> Coord | tuple[Coord, ...] | None:
coords = list(self.default_coords())
# AR/MA param coords
if self.p > 0:
coords.append(Coord(dimension=AR_PARAM_DIM, labels=tuple(range(1, self.p + 1))))
if self.q > 0:
coords.append(Coord(dimension=MA_PARAM_DIM, labels=tuple(range(1, self.q + 1))))
# Exogenous coords
if isinstance(self.exog_state_names, list):
coords.append(Coord(dimension=EXOG_STATE_DIM, labels=tuple(self.exog_state_names)))
elif isinstance(self.exog_state_names, dict):
for name, exog_names in self.exog_state_names.items():
coords.append(Coord(dimension=f"{EXOG_STATE_DIM}_{name}", labels=tuple(exog_names)))
return tuple(coords)
@property
def default_priors(self):
raise NotImplementedError
def add_default_priors(self):
raise NotImplementedError
def make_symbolic_graph(self) -> None:
# Initialize the matrices
if not self.stationary_initialization:
# initial states
x0 = self.make_and_register_variable("x0", shape=(self.k_states,), dtype=floatX)
self.ssm["initial_state", :] = x0
# initial covariance
P0 = self.make_and_register_variable(
"P0", shape=(self.k_states, self.k_states), dtype=floatX
)
self.ssm["initial_state_cov", :, :] = P0
# Design matrix is a truncated identity (first k_obs states observed)
self.ssm[("design", *np.diag_indices(self.k_endog))] = 1
# Transition matrix has 4 blocks:
# Upper left: AR coefs (k_obs, k_obs * min(p, 1))
# Upper right: MA coefs (k_obs, k_obs * q)
# Lower left: Truncated identity (k_obs * min(p, 1), k_obs * min(p, 1))
# Lower right: Shifted identity (k_obs * p, k_obs * q)
self.ssm["transition"] = np.zeros((self.k_states, self.k_states))
if self.p > 1:
idx = (
slice(self.k_endog, self.k_endog * self.p),
slice(0, self.k_endog * (self.p - 1)),
)
self.ssm[("transition", *idx)] = np.eye(self.k_endog * (self.p - 1))
if self.q > 1:
idx = (
slice(-self.k_endog * (self.q - 1), None),
slice(-self.k_endog * self.q, -self.k_endog),
)
self.ssm[("transition", *idx)] = np.eye(self.k_endog * (self.q - 1))
if self.p > 0:
ar_param_idx = ("transition", slice(0, self.k_endog), slice(0, self.k_endog * self.p))
# Register the AR parameter matrix as a (k, p, k), then reshape it and allocate it in the transition matrix
# This way the user can use 3 dimensions in the prior (clearer?)
ar_params = self.make_and_register_variable(
"ar_params", shape=(self.k_endog, self.p, self.k_endog), dtype=floatX
)
ar_params = ar_params.reshape((self.k_endog, self.k_endog * self.p))
self.ssm[ar_param_idx] = ar_params
# The selection matrix is (k_states, k_obs), with two (k_obs, k_obs) identity
# matrix blocks inside. One is always on top, the other starts after (k_obs * p) rows
self.ssm["selection"] = np.zeros((self.k_states, self.k_endog))
self.ssm["selection", slice(0, self.k_endog), :] = np.eye(self.k_endog)
if self.q > 0:
ma_param_idx = (
"transition",
slice(0, self.k_endog),
slice(self.k_endog * max(1, self.p), None),
)
# Same as above, register with 3 dimensions then reshape
ma_params = self.make_and_register_variable(
"ma_params", shape=(self.k_endog, self.q, self.k_endog), dtype=floatX
)
ma_params = ma_params.reshape((self.k_endog, self.k_endog * self.q))
self.ssm[ma_param_idx] = ma_params
end = -self.k_endog * (self.q - 1) if self.q > 1 else None
self.ssm["selection", slice(self.k_endog * -self.q, end), :] = np.eye(self.k_endog)
if self.measurement_error:
obs_cov_idx = ("obs_cov", *np.diag_indices(self.k_endog))
sigma_obs = self.make_and_register_variable(
"sigma_obs", shape=(self.k_endog,), dtype=floatX
)
self.ssm[obs_cov_idx] = sigma_obs
state_cov = self.make_and_register_variable(
"state_cov", shape=(self.k_posdef, self.k_posdef), dtype=floatX
)
self.ssm["state_cov", :, :] = state_cov
if self.exog_state_names is not None:
if isinstance(self.exog_state_names, list):
beta_exog = self.make_and_register_variable(
"beta_exog", shape=(self.k_posdef, self.k_exog), dtype=floatX
)
exog_data = self.make_and_register_data(
"exogenous_data", shape=(None, self.k_exog), dtype=floatX
)
obs_intercept = exog_data @ beta_exog.T
elif isinstance(self.exog_state_names, dict):
obs_components = []
for i, name in enumerate(self.endog_names):
if name in self.exog_state_names:
k_exog = len(self.exog_state_names[name])
beta_exog = self.make_and_register_variable(
f"beta_{name}", shape=(k_exog,), dtype=floatX
)
exog_data = self.make_and_register_data(
f"{name}_exogenous_data", shape=(None, k_exog), dtype=floatX
)
obs_components.append(pt.expand_dims(exog_data @ beta_exog, axis=-1))
else:
obs_components.append(pt.zeros((1, 1), dtype=floatX))
# TODO: Replace all of this with pt.concat_with_broadcast once PyMC works with pytensor >= 2.32
# If there were any zeros, they need to be broadcast against the non-zeros.
# Core shape is the last dim, the time dim is always broadcast
non_concat_shape = [1, None]
# Look for the first non-zero component to get the shape from
for tensor_inp in obs_components:
for i, (bcast, sh) in enumerate(
zip(tensor_inp.type.broadcastable, tensor_inp.shape)
):
if bcast or i == 1:
continue
non_concat_shape[i] = sh
assert non_concat_shape.count(None) == 1
bcast_tensor_inputs = []
for tensor_inp in obs_components:
non_concat_shape[1] = tensor_inp.shape[1]
bcast_tensor_inputs.append(pt.broadcast_to(tensor_inp, non_concat_shape))
obs_intercept = pt.join(1, *bcast_tensor_inputs)
else:
raise NotImplementedError()
self.ssm["obs_intercept"] = obs_intercept
self.ssm.declare_time_varying("obs_intercept")
if self.stationary_initialization:
# Solve for matrix quadratic for P0
T = self.ssm["transition"]
R = self.ssm["selection"]
Q = self.ssm["state_cov"]
c = self.ssm["state_intercept"]
x0 = pt.linalg.solve(pt.eye(T.shape[0]) - T, c, assume_a="gen", check_finite=False)
P0 = solve_discrete_lyapunov(
T,
pt.linalg.matrix_dot(R, Q, R.T),
method="direct" if self.k_states < 10 else "bilinear",
)
self.ssm["initial_state", :] = x0
self.ssm["initial_state_cov", :, :] = P0