import copy
import numpy as np
import pytensor
import pytensor.tensor as pt
floatX = pytensor.config.floatX
KeyLike = tuple[str | int, ...] | str
# Number of trailing dims that define each matrix's "core" shape (1 for vectors, 2 for
# matrices). Anything to the left is a time and/or batch dim; downstream consumers
# distinguish them by consulting ``Representation.time_varying_names``.
_CORE_NDIM: dict[str, int] = {
"design": 2,
"obs_intercept": 1,
"obs_cov": 2,
"transition": 2,
"state_intercept": 1,
"selection": 2,
"state_cov": 2,
"initial_state": 1,
"initial_state_cov": 2,
}
[docs]
class PytensorRepresentation:
r"""
Container for the nine state-space matrices of a linear Gaussian model.
Each matrix is stored in its *natural* shape: ``core_shape`` if static, or
``(time, *core_shape)`` if time-varying. Optional leading batch dims are accepted and
propagate untouched -- use ``vectorize_graph`` if a downstream consumer needs to lift
over them.
Parameters
----------
k_endog : int
Number of observed states.
k_states : int
Number of hidden states.
k_posdef : int, optional
Rank of the selection matrix :math:`R`; also the number of stochastic shocks. Default
``k_states``.
design : array_like, optional
Initial value for the design matrix :math:`Z`.
obs_intercept : array_like, optional
Initial value for :math:`d`.
obs_cov : array_like, optional
Initial value for the observation noise covariance :math:`H`.
transition : array_like, optional
Initial value for the transition matrix :math:`T`.
state_intercept : array_like, optional
Initial value for :math:`c`.
selection : array_like, optional
Initial value for the selection matrix :math:`R`.
state_cov : array_like, optional
Initial value for the state innovation covariance :math:`Q`.
initial_state : array_like, optional
Initial value for :math:`\bar x_0`.
initial_state_cov : array_like, optional
Initial value for :math:`P_0`.
Notes
-----
A linear state-space system has
.. math::
\begin{align}
x_t &= T_t x_{t-1} + c_t + R_t \varepsilon_t \\
y_t &= Z_t x_t + d_t + \eta_t \\
\varepsilon_t &\sim N(0, Q_t) \\
\eta_t &\sim N(0, H_t) \\
x_0 &\sim N(\bar x_0, P_0)
\end{align}
with hidden state size :math:`m = k_{\text{states}}`, observed size :math:`p = k_{\text{endog}}`,
and shock rank :math:`r = k_{\text{posdef}}`. The natural per-matrix shapes are
+---------------------+-------------------------+
| Matrix | Natural shape |
+=====================+=========================+
| ``initial_state`` | ``(m,)`` |
+---------------------+-------------------------+
| ``initial_state_cov`` | ``(m, m)`` |
+---------------------+-------------------------+
| ``state_intercept`` | ``(m,)`` |
+---------------------+-------------------------+
| ``obs_intercept`` | ``(p,)`` |
+---------------------+-------------------------+
| ``transition`` | ``(m, m)`` |
+---------------------+-------------------------+
| ``design`` | ``(p, m)`` |
+---------------------+-------------------------+
| ``selection`` | ``(m, r)`` |
+---------------------+-------------------------+
| ``obs_cov`` | ``(p, p)`` |
+---------------------+-------------------------+
| ``state_cov`` | ``(r, r)`` |
+---------------------+-------------------------+
Any matrix may carry a leading dim of length ``n`` (the number of observations) to
make it time-varying. The model is responsible for declaring which matrices vary over
time via :meth:`declare_time_varying`; downstream consumers (filter, smoother, etc.)
read :attr:`time_varying_names` to dispatch.
.. warning::
The time dim must always be the **leftmost** axis of a time-varying matrix. The
Kalman filter scan iterates over axis 0, ``pm.Deterministic`` registration prepends
a ``TIME_DIM`` coord to the dim list, and forecast slicing reads ``matrix[n_train:]``
— all of these assume the time axis sits in front of the core dims. Optional batch
dims may sit even further left (``(*batch, time, *core)``).
Examples
--------
Store and retrieve a transition matrix:
.. code:: python
from pymc_extras.statespace.core.representation import PytensorRepresentation
ssm = PytensorRepresentation(k_endog=1, k_states=3, k_posdef=1)
ssm["transition"].type.shape
# (3, 3)
Set individual entries:
.. code:: python
ssm["design", 0, 0] = 1.0
Replace an entire matrix, optionally with a time dim:
.. code:: python
import numpy as np
ssm["obs_intercept"] = np.arange(10)[:, None] # 10 timesteps, k_endog=1
ssm["obs_intercept"].type.shape
# (10, 1)
References
----------
.. [1] Durbin, James, and Siem Jan Koopman. 2012.
Time Series Analysis by State Space Methods: Second Edition.
Oxford University Press.
"""
__slots__ = (
"_time_varying_names",
"design",
"initial_state",
"initial_state_cov",
"k_endog",
"k_posdef",
"k_states",
"obs_cov",
"obs_intercept",
"selection",
"state_cov",
"state_intercept",
"transition",
)
[docs]
def __init__(
self,
k_endog: int,
k_states: int,
k_posdef: int | None = None,
design: np.ndarray | pt.TensorVariable | None = None,
obs_intercept: np.ndarray | pt.TensorVariable | None = None,
obs_cov: np.ndarray | pt.TensorVariable | None = None,
transition: np.ndarray | pt.TensorVariable | None = None,
state_intercept: np.ndarray | pt.TensorVariable | None = None,
selection: np.ndarray | pt.TensorVariable | None = None,
state_cov: np.ndarray | pt.TensorVariable | None = None,
initial_state: np.ndarray | pt.TensorVariable | None = None,
initial_state_cov: np.ndarray | pt.TensorVariable | None = None,
) -> None:
self.k_endog = k_endog
self.k_states = k_states
self.k_posdef = k_posdef if k_posdef is not None else k_states
provided = {
"design": design,
"obs_intercept": obs_intercept,
"obs_cov": obs_cov,
"transition": transition,
"state_intercept": state_intercept,
"selection": selection,
"state_cov": state_cov,
"initial_state": initial_state,
"initial_state_cov": initial_state_cov,
}
self._time_varying_names: set[str] = set()
for name in _CORE_NDIM:
value = provided[name]
if value is None:
tensor = pt.as_tensor_variable(
np.zeros(self._core_shape(name), dtype=floatX), name=name
)
else:
tensor = self._coerce(name, value)
setattr(self, name, tensor)
def _core_shape(self, name: str) -> tuple[int, ...]:
k_endog, k_states, k_posdef = self.k_endog, self.k_states, self.k_posdef
return {
"design": (k_endog, k_states),
"obs_intercept": (k_endog,),
"obs_cov": (k_endog, k_endog),
"transition": (k_states, k_states),
"state_intercept": (k_states,),
"selection": (k_states, k_posdef),
"state_cov": (k_posdef, k_posdef),
"initial_state": (k_states,),
"initial_state_cov": (k_states, k_states),
}[name]
def _validate_name(self, name: str) -> None:
if name not in _CORE_NDIM:
raise IndexError(f"{name!r} is an invalid state space matrix name")
def _coerce(
self, name: str, value: np.ndarray | pt.TensorVariable | float | int
) -> pt.TensorVariable:
"""Wrap ``value`` as a named TensorVariable after validating the trailing core dims.
Anything to the left of the core dims is accepted unchanged -- ``core``,
``(time, *core)``, ``(*batch, *core)``, and ``(*batch, time, *core)`` are all valid.
Whether a leading dim is "time" or "batch" is the model's decision, recorded via
``declare_time_varying``; the rep itself does not guess.
"""
core_ndim = _CORE_NDIM[name]
core_shape = self._core_shape(name)
if isinstance(value, np.ndarray):
tensor = pt.as_tensor_variable(value.astype(floatX), name=name)
elif isinstance(value, int | float):
tensor = pt.as_tensor_variable(np.asarray(value, dtype=floatX), name=name)
else:
tensor = pt.as_tensor_variable(value)
tensor.name = name
if tensor.ndim < core_ndim:
raise ValueError(
f"{name} must have at least {core_ndim} dimension(s); got ndim={tensor.ndim}"
)
trailing = tensor.type.shape[-core_ndim:]
for got, want in zip(trailing, core_shape, strict=True):
# A None entry in ``tensor.type.shape`` is a runtime-only dim (pytensor doesn't
# know its size at graph time). Skip; let it fail at runtime if wrong.
if got is not None and got != want:
raise ValueError(f"Trailing dims of {name} are {trailing}, expected {core_shape}")
return tensor
@property
def time_varying_names(self) -> frozenset[str]:
"""Names of matrices the model declared as time-varying.
The Kalman filter iterates over the leading dim of these tensors at scan time;
all other matrices are passed in as scan non-sequences (i.e. used unchanged at
every step).
"""
return frozenset(self._time_varying_names)
def declare_time_varying(self, *names: str) -> None:
"""Mark matrices as time-varying. The filter will iterate over the leading dim."""
for name in names:
self._validate_name(name)
self._time_varying_names.add(name)
def __getitem__(self, key: KeyLike) -> pt.TensorVariable:
if isinstance(key, str):
self._validate_name(key)
return getattr(self, key)
if isinstance(key, tuple) and isinstance(key[0], str):
name, *idx = key
self._validate_name(name)
tensor = getattr(self, name)
if not idx:
return tensor
return tensor[tuple(idx)]
raise IndexError("First index must the name of a valid state space matrix.")
def __setitem__(
self, key: KeyLike, value: float | int | np.ndarray | pt.TensorVariable
) -> None:
if isinstance(key, str):
self._validate_name(key)
setattr(self, key, self._coerce(key, value))
return
if isinstance(key, tuple) and isinstance(key[0], str):
name, *idx = key
self._validate_name(name)
existing = getattr(self, name)
updated = pt.set_subtensor(existing[tuple(idx)], value)
updated.name = name
setattr(self, name, updated)
return
raise IndexError("First index must the name of a valid state space matrix.")
def copy(self) -> "PytensorRepresentation":
return copy.copy(self)