from collections.abc import Sequence
import numpy as np
import pytensor.tensor as pt
from pytensor.compile.mode import Mode
from pytensor.graph.replace import graph_replace
from pytensor.tensor.linalg import solve_discrete_lyapunov
from pymc_extras.statespace.core.properties import (
Coord,
Parameter,
Shock,
State,
)
from pymc_extras.statespace.core.statespace import PyMCStateSpace, floatX
from pymc_extras.statespace.models.utilities import validate_names
from pymc_extras.statespace.utils.constants import (
ALL_STATE_AUX_DIM,
ALL_STATE_DIM,
ETS_SEASONAL_DIM,
OBS_STATE_AUX_DIM,
OBS_STATE_DIM,
)
[docs]
class BayesianETS(PyMCStateSpace):
r"""
Exponential Smoothing State Space Model
This class can represent a subset of exponential smoothing state space models, specifically those with additive
errors. Following .. [1], The general form of the model is:
.. math::
\begin{align}
y_t &= l_{t-1} + b_{t-1} + s_{t-m} + \epsilon_t \\
\epsilon_t &\sim N(0, \sigma)
\end{align}
where :math:`l_t` is the level component, :math:`b_t` is the trend component, and :math:`s_t` is the seasonal
component. These components can be included or excluded, leading to different model specifications. The following
models are possible:
* `ETS(A,N,N)`: Simple exponential smoothing
.. math::
\begin{align}
y_t &= l_{t-1} + \epsilon_t \\
l_t &= l_{t-1} + \alpha \epsilon_t
\end{align}
Where :math:`\alpha \in [0, 1]` is a mixing parameter between past observations and current innovations.
These equations arise by starting from the "component form":
.. math::
\begin{align}
\hat{y}_{t+1 | t} &= l_t \\
l_t &= \alpha y_t + (1 - \alpha) l_{t-1} \\
&= l_{t-1} + \alpha (y_t - l_{t-1})
&= l_{t-1} + \alpha \epsilon_t
\end{align}
Where $\epsilon_t$ are the forecast errors, assumed to be IID mean zero and normally distributed. The role of
:math:`\alpha` is clearest in the second line. The level of the time series at each time is a mixture of
:math:`\alpha` percent of the incoming data, and :math:`1 - \alpha` percent of the previous level. Recursive
substitution reveals that the level is a weighted composite of all previous observations; thus the name
"Exponential Smoothing".
Additional supposed specifications include:
* `ETS(A,A,N)`: Holt's linear trend method
.. math::
\begin{align}
y_t &= l_{t-1} + b_{t-1} + \epsilon_t \\
l_t &= l_{t-1} + b_{t-1} + \alpha \epsilon_t \\
b_t &= b_{t-1} + \alpha \beta^\star \epsilon_t
\end{align}
[1]_ also consider an alternative parameterization with :math:`\beta = \alpha \beta^\star`.
* `ETS(A,N,A)`: Additive seasonal method
.. math::
\begin{align}
y_t &= l_{t-1} + s_{t-m} + \epsilon_t \\
l_t &= l_{t-1} + \alpha \epsilon_t \\
s_t &= s_{t-m} + (1 - \alpha)\gamma^\star \epsilon_t
\end{align}
[1]_ also consider an alternative parameterization with :math:`\gamma = (1 - \alpha) \gamma^\star`.
* `ETS(A,A,A)`: Additive Holt-Winters method
.. math::
\begin{align}
y_t &= l_{t-1} + b_{t-1} + s_{t-m} + \epsilon_t \\
l_t &= l_{t-1} + \alpha \epsilon_t \\
b_t &= b_{t-1} + \alpha \beta^\star \epsilon_t \\
s_t &= s_{t-m} + (1 - \alpha) \gamma^\star \epsilon_t
\end{align}
[1]_ also consider an alternative parameterization with :math:`\beta = \alpha \beta^star` and
:math:`\gamma = (1 - \alpha) \gamma^\star`.
* `ETS(A, Ad, N)`: Dampened trend method
.. math::
\begin{align}
y_t &= l_{t-1} + b_{t-1} + \epsilon_t \\
l_t &= l_{t-1} + \alpha \epsilon_t \\
b_t &= \phi b_{t-1} + \alpha \beta^\star \epsilon_t
\end{align}
[1]_ also consider an alternative parameterization with :math:`\beta = \alpha \beta^\star`.
* `ETS(A, Ad, A)`: Dampened trend with seasonal method
.. math::
\begin{align}
y_t &= l_{t-1} + b_{t-1} + s_{t-m} + \epsilon_t \\
l_t &= l_{t-1} + \alpha \epsilon_t \\
b_t &= \phi b_{t-1} + \alpha \beta^\star \epsilon_t \\
s_t &= s_{t-m} + (1 - \alpha) \gamma^\star \epsilon_t
\end{align}
[1]_ also consider an alternative parameterization with :math:`\beta = \alpha \beta^star` and
:math:`\gamma = (1 - \alpha) \gamma^\star`.
Parameters
----------
order: tuple of string, Optional
The exponential smoothing "order". This is a tuple of three strings, each of which should be one of 'A', 'Ad',
or 'N'.
If provided, the model will be initialized from the given order, and the `trend`, `damped_trend`, and `seasonal`
arguments will be ignored.
endog_names: str or Sequence of str
Names associated with observed states. If a list, the length should be equal to the number of time series
to be estimated.
trend: bool
Whether to include a trend component. Setting ``trend=True`` is equivalent to ``order[1] == 'A'``.
damped_trend: bool
Whether to include a damping parameter on the trend component. Ignored if `trend` is `False`. Setting
``trend=True`` and ``damped_trend=True`` is equivalent to order[1] == 'Ad'.
seasonal: bool
Whether to include a seasonal component. Setting ``seasonal=True`` is equivalent to ``order[2] = 'A'``.
seasonal_periods: int
The number of periods in a complete seasonal cycle. Ignored if `seasonal` is `False`
(or if ``order[2] == "N"``)
measurement_error: bool
Whether to include a measurement error term in the model. Default is `False`.
use_transformed_parameterization: bool, default False
If true, use the :math:`\alpha, \beta, \gamma` parameterization, otherwise use the :math:`\alpha, \beta^\star,
\gamma^\star` parameterization. This will change the admissible region for the priors.
- Under the **non-transformed** parameterization, all of :math:`\alpha, \beta^\star, \gamma^\star` should be
between 0 and 1.
- Under the **transformed** parameterization, :math:`\alpha \in (0, 1)`, :math:`\beta \in (0, \alpha)`, and
:math:`\gamma \in (0, 1 - \alpha)`
The :meth:`param_info` method will change to reflect the suggested intervals based on the value of this
argument.
dense_innovation_covariance: bool, default False
Whether to estimate a dense covariance for statespace innovations. In an ETS models, each observed variable
has a single source of stochastic variation. If True, these innovations are allowed to be correlated.
Ignored if ``k_endog == 1``
stationary_initialization: bool, default False
If True, the Kalman Filter's initial covariance matrix will be set to an approximate steady-state value.
The approximation is formed by adding a small dampening factor to each state. Specifically, the level state
for a ('A', 'N', 'N') model is written:
.. math::
\ell_t = \ell_{t-1} + \alpha * e_t
That this system is not stationary can be understood in ARIMA terms: the level is a random walk; that is,
:math:`rho = 1`. This can be remedied by pretending that we instead have a dampened system:
.. math::
\ell_t = \rho \ell_{t-1} + \alpha * e_t
With :math:`\rho \approx 1`, the system is stationary, and we can solve for the steady-state covariance
matrix. This is then used as the initial covariance matrix for the Kalman Filter. This is a heuristic
method that helps avoid setting a prior on the initial covariance matrix.
initialization_dampening: float, default 0.8
Dampening factor to add to non-stationary model components. This is only used for initialization, it does
*not* add dampening to the model. Ignored if `stationary_initialization` is `False`.
filter_type: str, default "standard"
The type of Kalman Filter to use. Options are "standard", "single", "univariate", "steady_state",
and "cholesky". See the docs for kalman filters for more details.
verbose: bool, default True
If true, a message will be logged to the terminal explaining the variable names, dimensions, and supports.
mode: str or Mode, optional
Pytensor compile mode, used in auxiliary sampling methods such as ``sample_conditional_posterior`` and
``forecast``. The mode does **not** effect calls to ``pm.sample``.
Regardless of whether a mode is specified, it can always be overwritten via the ``compile_kwargs`` argument
to all sampling methods.
References
----------
.. [1] Hyndman, Rob J., and George Athanasopoulos. Forecasting: principles and practice. OTexts, 2018.
"""
[docs]
def __init__(
self,
order: tuple[str, str, str] | None = None,
endog_names: str | Sequence[str] | None = None,
trend: bool = True,
damped_trend: bool = False,
seasonal: bool = False,
seasonal_periods: int | None = None,
measurement_error: bool = False,
use_transformed_parameterization: bool = False,
dense_innovation_covariance: bool = False,
stationary_initialization: bool = False,
initialization_dampening: float = 0.8,
filter_type: str = "standard",
verbose: bool = True,
mode: str | Mode | None = None,
):
if order is not None:
if len(order) != 3 or any(not isinstance(o, str) for o in order):
raise ValueError("Order must be a tuple of three strings.")
if order[0] != "A":
raise ValueError("Only additive errors are supported.")
if order[1] not in {"A", "Ad", "N"}:
raise ValueError(
f"Invalid trend specification. Only 'A' (additive), 'Ad' (additive with dampening), "
f"or 'N' (no trend) are allowed. Found {order[1]}"
)
if order[2] not in {"A", "N"}:
raise ValueError(
f"Invalid seasonal specification. Only 'A' (additive) or 'N' (no seasonal component) "
f"are allowed. Found {order[2]}"
)
trend = order[1] != "N"
damped_trend = order[1] == "Ad"
seasonal = order[2] == "A"
self.trend = trend
self.damped_trend = damped_trend
self.seasonal = seasonal
self.seasonal_periods = seasonal_periods
self.use_transformed_parameterization = use_transformed_parameterization
self.stationary_initialization = stationary_initialization
if not (0.0 < initialization_dampening < 1.0):
raise ValueError(
"Dampening term used for initialization must be between 0 and 1 (preferably close to"
"1.0)"
)
self.initialization_dampening = initialization_dampening
if self.seasonal and self.seasonal_periods is None:
raise ValueError("If seasonal is True, seasonal_periods must be provided.")
validate_names(endog_names, var_name="endog_names", optional=False)
k_endog = len(endog_names)
self.endog_names = (
tuple(endog_names) if not isinstance(endog_names, str) else (endog_names,)
)
if dense_innovation_covariance and k_endog == 1:
dense_innovation_covariance = False
self.dense_innovation_covariance = dense_innovation_covariance
k_states = (
2
+ int(trend)
+ int(seasonal) * (seasonal_periods if seasonal_periods is not None else 0)
) * k_endog
k_posdef = k_endog
super().__init__(
k_endog,
k_states,
k_posdef,
filter_type,
verbose=verbose,
measurement_error=measurement_error,
mode=mode,
)
def set_parameters(self) -> Parameter | tuple[Parameter, ...] | None:
k_endog = self.k_endog
k_states = self.k_states
k_posdef = self.k_posdef
parameters = []
# Initial level - always present
parameters.append(
Parameter(
name="initial_level",
shape=() if k_endog == 1 else (k_endog,),
dims=None if k_endog == 1 else (OBS_STATE_DIM,),
constraints=None,
)
)
# Initial trend - only if trend is enabled
if self.trend:
parameters.append(
Parameter(
name="initial_trend",
shape=() if k_endog == 1 else (k_endog,),
dims=None if k_endog == 1 else (OBS_STATE_DIM,),
constraints=None,
)
)
# Initial seasonal - only if seasonal is enabled
if self.seasonal:
parameters.append(
Parameter(
name="initial_seasonal",
shape=(self.seasonal_periods,)
if k_endog == 1
else (k_endog, self.seasonal_periods),
dims=(ETS_SEASONAL_DIM,) if k_endog == 1 else (OBS_STATE_DIM, ETS_SEASONAL_DIM),
constraints=None,
)
)
# P0 - only if not stationary initialization
if not self.stationary_initialization:
parameters.append(
Parameter(
name="P0",
shape=(k_states, k_states),
dims=(ALL_STATE_DIM, ALL_STATE_AUX_DIM),
constraints="Positive Semi-definite",
)
)
# Alpha - always present
parameters.append(
Parameter(
name="alpha",
shape=() if k_endog == 1 else (k_endog,),
dims=None if k_endog == 1 else (OBS_STATE_DIM,),
constraints="0 < alpha < 1",
)
)
# Beta - only if trend is enabled
if self.trend:
beta_constraint = (
"0 < beta < alpha" if self.use_transformed_parameterization else "0 < beta < 1"
)
parameters.append(
Parameter(
name="beta",
shape=() if k_endog == 1 else (k_endog,),
dims=None if k_endog == 1 else (OBS_STATE_DIM,),
constraints=beta_constraint,
)
)
# Gamma - only if seasonal is enabled
if self.seasonal:
gamma_constraint = (
"0 < gamma < (1 - alpha)"
if self.use_transformed_parameterization
else "0 < gamma < 1"
)
parameters.append(
Parameter(
name="gamma",
shape=() if k_endog == 1 else (k_endog,),
dims=None if k_endog == 1 else (OBS_STATE_DIM,),
constraints=gamma_constraint,
)
)
# Phi - only if damped trend is enabled
if self.damped_trend:
parameters.append(
Parameter(
name="phi",
shape=() if k_endog == 1 else (k_endog,),
dims=None if k_endog == 1 else (OBS_STATE_DIM,),
constraints="0 < phi < 1",
)
)
# State covariance
if self.dense_innovation_covariance:
parameters.append(
Parameter(
name="state_cov",
shape=(k_posdef, k_posdef),
dims=(OBS_STATE_DIM, OBS_STATE_AUX_DIM),
constraints="Positive Semi-definite",
)
)
else:
parameters.append(
Parameter(
name="sigma_state",
shape=() if k_posdef == 1 else (k_posdef,),
dims=None if k_posdef == 1 else (OBS_STATE_DIM,),
constraints="Positive",
)
)
# Observation covariance - only if measurement error is enabled
if self.measurement_error:
parameters.append(
Parameter(
name="sigma_obs",
shape=() if k_endog == 1 else (k_endog,),
dims=None if k_endog == 1 else (OBS_STATE_DIM,),
constraints="Positive",
)
)
return tuple(parameters)
def set_states(self) -> State | tuple[State, ...] | None:
k_endog = self.k_endog
base_states = ["innovation", "level"]
if self.trend:
base_states.append("trend")
if self.seasonal:
base_states.append("seasonality")
base_states += [f"L{i}.season" for i in range(1, self.seasonal_periods)]
if k_endog > 1:
state_names = [f"{name}_{state}" for name in self.endog_names for state in base_states]
else:
state_names = base_states
hidden_states = [State(name=name, observed=False, shared=False) for name in state_names]
observed_states = [
State(name=name, observed=True, shared=False) for name in self.endog_names
]
return *hidden_states, *observed_states
def set_shocks(self) -> Shock | tuple[Shock, ...] | None:
k_endog = self.k_endog
if k_endog == 1:
shock_names = ["innovation"]
else:
shock_names = [f"{name}_innovation" for name in self.endog_names]
return tuple(Shock(name=name) for name in shock_names)
def set_coords(self) -> Coord | tuple[Coord, ...] | None:
coords = list(self.default_coords())
# Seasonal coords
if self.seasonal:
coords.append(
Coord(
dimension=ETS_SEASONAL_DIM,
labels=tuple(range(1, self.seasonal_periods + 1)),
)
)
return tuple(coords)
def _stationary_initialization(self, T_stationary):
# Solve for matrix quadratic for P0
R = self.ssm["selection"]
Q = self.ssm["state_cov"]
# ETS models are not stationary, but we can proceed *as if* the model were stationary by introducing large
# dampening factors on all components. We then set the initial covariance to the steady-state of that system,
# which we hope is similar enough to give a good initialization for the non-stationary system.
P0 = solve_discrete_lyapunov(T_stationary, pt.linalg.matrix_dot(R, Q, R.T))
return P0
def make_symbolic_graph(self) -> None:
k_states_each = self.k_states // self.k_endog
initial_level = self.make_and_register_variable(
"initial_level", shape=(self.k_endog,) if self.k_endog > 1 else (), dtype=floatX
)
initial_states = [pt.zeros(k_states_each) for _ in range(self.k_endog)]
if self.k_endog == 1:
initial_states = [pt.set_subtensor(initial_states[0][1], initial_level)]
else:
initial_states = [
pt.set_subtensor(initial_state[1], initial_level[i])
for i, initial_state in enumerate(initial_states)
]
# The shape of R can be pre-allocated, then filled with the required parameters
R = pt.zeros((self.k_states // self.k_endog, 1))
alpha = self.make_and_register_variable(
"alpha", shape=() if self.k_endog == 1 else (self.k_endog,), dtype=floatX
)
# This is a dummy value for initialization. When we do a stationary initialization, it will be set to a value
# close to 1. Otherwise, it will be 1. We do not want this value to exist outside of this method.
stationary_dampening = pt.scalar("dampen_dummy")
if self.k_endog == 1:
# The R[0, 0] entry needs to be adjusted for a shift in the time indices. Consider the (A, N, N) model:
# y_t = l_{t-1} + e_t
# l_t = l_{t-1} + alpha * e_t
R_list = [pt.set_subtensor(R[1, 0], alpha)] # and l_t = ... + alpha * e_t
# We want the first equation to be in terms of time t on the RHS, because our observation equation is always
# y_t = Z @ x_t. Re-arranging equation 2, we get l_{t-1} = l_t - alpha * e_t --> y_t = l_t + e_t - alpha * e_t
# --> y_t = l_t + (1 - alpha) * e_t
R_list = [pt.set_subtensor(R[0, :], (1 - alpha)) for R in R_list]
else:
# If there are multiple endog, clone the basic R matrix and modify the appropriate entries
R_list = [pt.set_subtensor(R[1, 0], alpha[i]) for i in range(self.k_endog)]
R_list = [pt.set_subtensor(R[0, :], (1 - alpha[i])) for i, R in enumerate(R_list)]
# Shock and level component always exists, the base case is e_t = e_t and l_t = l_{t-1}
T_base = pt.set_subtensor(pt.zeros((2, 2))[1, 1], stationary_dampening)
if self.trend:
initial_trend = self.make_and_register_variable(
"initial_trend", shape=(self.k_endog,) if self.k_endog > 1 else (), dtype=floatX
)
if self.k_endog == 1:
initial_states = [pt.set_subtensor(initial_states[0][2], initial_trend)]
else:
initial_states = [
pt.set_subtensor(initial_state[2], initial_trend[i])
for i, initial_state in enumerate(initial_states)
]
beta = self.make_and_register_variable(
"beta", shape=() if self.k_endog == 1 else (self.k_endog,), dtype=floatX
)
if self.use_transformed_parameterization:
param = beta
else:
param = alpha * beta
if self.k_endog == 1:
R_list = [pt.set_subtensor(R[2, 0], param) for R in R_list]
else:
R_list = [pt.set_subtensor(R[2, 0], param[i]) for i, R in enumerate(R_list)]
# If a trend is requested, we have the following transition equations (omitting the shocks):
# l_t = l_{t-1} + b_{t-1}
# b_t = b_{t-1}
T_base = pt.as_tensor_variable(([0.0, 0.0, 0.0], [0.0, 1.0, 1.0], [0.0, 0.0, 1.0]))
T_base = pt.set_subtensor(T_base[[1, 2], [1, 2]], stationary_dampening)
if self.damped_trend:
phi = self.make_and_register_variable(
"phi", shape=() if self.k_endog == 1 else (self.k_endog,), dtype=floatX
)
# We are always in the case where we have a trend, so we can add the dampening parameter to T_base defined
# in that branch. Transition equations become:
# l_t = l_{t-1} + phi * b_{t-1}
# b_t = phi * b_{t-1}
if self.k_endog > 1:
T_base = [pt.set_subtensor(T_base[1:, 2], phi[i]) for i in range(self.k_endog)]
else:
T_base = pt.set_subtensor(T_base[1:, 2], phi)
T_components = (
[T_base for _ in range(self.k_endog)] if not isinstance(T_base, list) else T_base
)
if self.seasonal:
initial_seasonal = self.make_and_register_variable(
"initial_seasonal",
shape=(self.seasonal_periods,)
if self.k_endog == 1
else (self.k_endog, self.seasonal_periods),
dtype=floatX,
)
if self.k_endog == 1:
initial_states = [
pt.set_subtensor(initial_states[0][2 + int(self.trend) :], initial_seasonal)
]
else:
initial_states = [
pt.set_subtensor(initial_state[2 + int(self.trend) :], initial_seasonal[i])
for i, initial_state in enumerate(initial_states)
]
gamma = self.make_and_register_variable(
"gamma", shape=() if self.k_endog == 1 else (self.k_endog,), dtype=floatX
)
param = gamma if self.use_transformed_parameterization else (1 - alpha) * gamma
# Additional adjustment to the R[0, 0] position is required. Start from:
# y_t = l_{t-1} + s_{t-m} + e_t
# l_t = l_{t-1} + alpha * e_t
# s_t = s_{t-m} + gamma * e_t
# Solve for l_{t-1} and s_{t-m} in terms of l_t and s_t, then substitute into the observation equation:
# y_t = l_t + s_t - alpha * e_t - gamma * e_t + e_t --> y_t = l_t + s_t + (1 - alpha - gamma) * e_t
if self.k_endog == 1:
R_list = [pt.set_subtensor(R[2 + int(self.trend), 0], param) for R in R_list]
R_list = [pt.set_subtensor(R[0, 0], R[0, 0] - param) for R in R_list]
else:
R_list = [
pt.set_subtensor(R[2 + int(self.trend), 0], param[i])
for i, R in enumerate(R_list)
]
R_list = [
pt.set_subtensor(R[0, 0], R[0, 0] - param[i]) for i, R in enumerate(R_list)
]
# The seasonal component is always going to look like a TimeFrequency structural component, see that
# docstring for more details
T_seasonals = [pt.eye(self.seasonal_periods, k=-1) for _ in range(self.k_endog)]
T_seasonals = [
pt.set_subtensor(T_seasonal[0, -1], stationary_dampening)
for T_seasonal in T_seasonals
]
# Organize the components so it goes T1, T_seasonal_1, T2, T_seasonal_2, etc.
T_components = [
matrix[i] for i in range(self.k_endog) for matrix in [T_components, T_seasonals]
]
x0 = pt.concatenate(initial_states, axis=0)
R = pt.linalg.block_diag(*R_list)
self.ssm["initial_state"] = x0
self.ssm["selection"] = R
T = pt.linalg.block_diag(*T_components)
# Remove the stationary_dampening dummies before saving the transition matrix
self.ssm["transition"] = graph_replace(T, {stationary_dampening: 1.0})
Zs = [np.zeros((self.k_endog, self.k_states // self.k_endog)) for _ in range(self.k_endog)]
for i, Z in enumerate(Zs):
Z[i, 0] = 1.0 # innovation
Z[i, 1] = 1.0 # level
if self.seasonal:
Z[i, 2 + int(self.trend)] = 1.0
Z = pt.concatenate(Zs, axis=1)
self.ssm["design"] = Z
# Set up the state covariance matrix
if self.dense_innovation_covariance:
state_cov = self.make_and_register_variable(
"state_cov", shape=(self.k_posdef, self.k_posdef), dtype=floatX
)
self.ssm["state_cov"] = state_cov
else:
state_cov_idx = ("state_cov", *np.diag_indices(self.k_posdef))
state_cov = self.make_and_register_variable(
"sigma_state", shape=() if self.k_posdef == 1 else (self.k_posdef,), dtype=floatX
)
self.ssm[state_cov_idx] = state_cov**2
if self.measurement_error:
obs_cov_idx = ("obs_cov", *np.diag_indices(self.k_endog))
obs_cov = self.make_and_register_variable(
"sigma_obs", shape=() if self.k_endog == 1 else (self.k_endog,), dtype=floatX
)
self.ssm[obs_cov_idx] = obs_cov**2
if self.stationary_initialization:
T_stationary = graph_replace(T, {stationary_dampening: self.initialization_dampening})
P0 = self._stationary_initialization(T_stationary)
else:
P0 = self.make_and_register_variable(
"P0", shape=(self.k_states, self.k_states), dtype=floatX
)
self.ssm["initial_state_cov"] = P0