r"""
Implementation of the Theta forecasting method of
Assimakopoulos, V., & Nikolopoulos, K. (2000). The theta model: a decomposition
approach to forecasting. International journal of forecasting, 16(4), 521-530.
and updates in
Hyndman, R. J., & Billah, B. (2003). Unmasking the Theta method. International
Journal of Forecasting, 19(2), 287-290.
Fioruci, J. A., Pellegrini, T. R., Louzada, F., & Petropoulos, F. (2015).
The optimized theta method. arXiv preprint arXiv:1503.03529.
"""
from typing import TYPE_CHECKING, Optional, Tuple
import numpy as np
import pandas as pd
from scipy import stats
from statsmodels.iolib.summary import Summary
from statsmodels.iolib.table import SimpleTable
from statsmodels.tools.validation import (
array_like,
bool_like,
float_like,
int_like,
string_like,
)
from statsmodels.tsa.deterministic import DeterministicTerm
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.statespace.exponential_smoothing import (
ExponentialSmoothing,
)
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import acf
from statsmodels.tsa.tsatools import add_trend, freq_to_period
if TYPE_CHECKING:
import matplotlib.figure
def extend_index(steps: int, index: pd.Index) -> pd.Index:
return DeterministicTerm._extend_index(index, steps)
[docs]class ThetaModel:
r"""
The Theta forecasting model of Assimakopoulos and Nikolopoulos (2000)
Parameters
----------
endog : array_like, 1d
The data to forecast.
period : int, default None
The period of the data that is used in the seasonality test and
adjustment. If None then the period is determined from y's index,
if available.
deseasonalize : bool, default True
A flag indicating whether the deseasonalize the data. If True and
use_test is True, the data is only deseasonalized if the null of no
seasonal component is rejected.
use_test : bool, default True
A flag indicating whether test the period-th autocorrelation. If this
test rejects using a size of 10%, then decomposition is used. Set to
False to skip the test.
method : {"auto", "additive", "multiplicative"}, default "auto"
The model used for the seasonal decomposition. "auto" uses a
multiplicative if y is non-negative and all estimated seasonal
components are positive. If either of these conditions is False,
then it uses an additive decomposition.
difference : bool, default False
A flag indicating to difference the data before testing for
seasonality.
See Also
--------
statsmodels.tsa.statespace.exponential_smoothing.ExponentialSmoothing
Exponential smoothing parameter estimation and forecasting
statsmodels.tsa.statespace.sarimax.SARIMAX
Seasonal ARIMA parameter estimation and forecasting
Notes
-----
The Theta model forecasts the future as a weighted combination of two
Theta lines. This class supports combinations of models with two
thetas: 0 and a user-specified choice (default 2). The forecasts are
then
.. math::
\hat{X}_{T+h|T} = \frac{\theta-1}{\theta} b_0
\left[h - 1 + \frac{1}{\alpha}
- \frac{(1-\alpha)^T}{\alpha} \right]
+ \tilde{X}_{T+h|T}
where :math:`\tilde{X}_{T+h|T}` is the SES forecast of the endogenous
variable using the parameter :math:`\alpha`. :math:`b_0` is the
slope of a time trend line fitted to X using the terms 0, 1, ..., T-1.
The model is estimated in steps:
1. Test for seasonality
2. Deseasonalize if seasonality detected
3. Estimate :math:`\alpha` by fitting a SES model to the data and
:math:`b_0` by OLS.
4. Forecast the series
5. Reseasonalize if the data was deseasonalized.
The seasonality test examines where the autocorrelation at the
seasonal period is different from zero. The seasonality is then
removed using a seasonal decomposition with a multiplicative trend.
If the seasonality estimate is non-positive then an additive trend
is used instead. The default deseasonalizing method can be changed
using the options.
References
----------
.. [1] Assimakopoulos, V., & Nikolopoulos, K. (2000). The theta model: a
decomposition approach to forecasting. International Journal of
Forecasting, 16(4), 521-530.
.. [2] Hyndman, R. J., & Billah, B. (2003). Unmasking the Theta method.
International Journal of Forecasting, 19(2), 287-290.
.. [3] Fioruci, J. A., Pellegrini, T. R., Louzada, F., & Petropoulos, F.
(2015). The optimized theta method. arXiv preprint arXiv:1503.03529.
"""
def __init__(
self,
endog,
*,
period: Optional[int] = None,
deseasonalize: bool = True,
use_test: bool = True,
method: str = "auto",
difference: bool = False
) -> None:
self._y = array_like(endog, "endog", ndim=1)
if isinstance(endog, pd.DataFrame):
self.endog_orig = endog.iloc[:, 0]
else:
self.endog_orig = endog
self._period = int_like(period, "period", optional=True)
self._deseasonalize = bool_like(deseasonalize, "deseasonalize")
self._use_test = (
bool_like(use_test, "use_test") and self._deseasonalize
)
self._diff = bool_like(difference, "difference")
self._method = string_like(
method,
"model",
options=("auto", "additive", "multiplicative", "mul", "add"),
)
if self._period is None and self._deseasonalize:
idx = getattr(endog, "index", None)
pfreq = None
if idx is not None:
pfreq = getattr(idx, "freq", None)
if pfreq is None:
pfreq = getattr(idx, "inferred_freq", None)
if pfreq is not None:
self._period = freq_to_period(pfreq)
else:
raise ValueError(
"You must specify a period or endog must be a "
"pandas object with a DatetimeIndex with "
"a freq not set to None"
)
self._has_seasonality = self._deseasonalize
def _test_seasonality(self) -> None:
y = self._y
if self._diff:
y = np.diff(y)
rho = acf(y, nlags=self._period, fft=True)
nobs = y.shape[0]
stat = nobs * rho[-1] ** 2 / np.sum(rho[:-1] ** 2)
# CV is 10% from a chi2(1), 1.645**2
self._has_seasonality = stat > 2.705543454095404
def _deseasonalize_data(self) -> Tuple[np.ndarray, np.ndarray]:
y = self._y
if not self._has_seasonality:
return self._y, np.empty(0)
self._method = (
"mul" if self._method == "auto" and self._y.min() > 0 else "add"
)
res = seasonal_decompose(y, model=self._method, period=self._period)
if res.seasonal.min() <= 0:
self._method = "add"
res = seasonal_decompose(y, model="add", period=self._period)
return y - res.seasonal, res.seasonal[: self._period]
else:
return y / res.seasonal, res.seasonal[: self._period]
[docs] def fit(
self, use_mle: bool = False, disp: bool = False
) -> "ThetaModelResults":
r"""
Estimate model parameters.
Parameters
----------
use_mle : bool, default False
Estimate the parameters using MLE by fitting an ARIMA(0,1,1) with
a drift. If False (the default), estimates parameters using OLS
of a constant and a time-trend and by fitting a SES to the model
data.
disp : bool, default True
Display iterative output from fitting the model.
Notes
-----
When using MLE, the parameters are estimated from the ARIMA(0,1,1)
.. math::
X_t = X_{t-1} + b_0 + (\alpha-1)\epsilon_{t-1} + \epsilon_t
When estimating the model using 2-step estimation, the model
parameters are estimated using the OLS regression
.. math::
X_t = a_0 + b_0 (t-1) + \eta_t
and the SES
.. math::
\tilde{X}_{t+1} = \alpha X_{t} + (1-\alpha)\tilde{X}_{t}
Returns
-------
ThetaModelResult
Model results and forecasting
"""
if self._deseasonalize and self._use_test:
self._test_seasonality()
y, seasonal = self._deseasonalize_data()
if use_mle:
mod = SARIMAX(y, order=(0, 1, 1), trend="c")
res = mod.fit(disp=disp)
params = np.asarray(res.params)
alpha = params[1] + 1
if alpha > 1:
alpha = 0.9998
res = mod.fit_constrained({"ma.L1": alpha - 1})
params = np.asarray(res.params)
b0 = params[0]
sigma2 = params[-1]
one_step = res.forecast(1) - b0
else:
ct = add_trend(y, "ct", prepend=True)[:, :2]
ct[:, 1] -= 1
_, b0 = np.linalg.lstsq(ct, y, rcond=None)[0]
res = ExponentialSmoothing(
y, initial_level=y[0], initialization_method="known"
).fit(disp=disp)
alpha = res.params[0]
sigma2 = None
one_step = res.forecast(1)
return ThetaModelResults(
b0, alpha, sigma2, one_step, seasonal, use_mle, self
)
@property
def deseasonalize(self) -> bool:
"""Whether to deseasonalize the data"""
return self._deseasonalize
@property
def period(self) -> int:
"""The period of the seasonality"""
return self._period
@property
def use_test(self) -> bool:
"""Whether to test the data for seasonality"""
return self._use_test
@property
def difference(self) -> bool:
"""Whether the data is differenced in the seasonality test"""
return self._diff
@property
def method(self) -> str:
"""The method used to deseasonalize the data"""
return self._method
[docs]class ThetaModelResults:
"""
Results class from estimated Theta Models.
Parameters
----------
b0 : float
The estimated trend slope.
alpha : float
The estimated SES parameter.
sigma2 : float
The estimated residual variance from the SES/IMA model.
one_step : float
The one-step forecast from the SES.
seasonal : ndarray
An array of estimated seasonal terms.
use_mle : bool
A flag indicating that the parameters were estimated using MLE.
model : ThetaModel
The model used to produce the results.
"""
def __init__(
self,
b0: float,
alpha: float,
sigma2: Optional[float],
one_step: float,
seasonal: np.ndarray,
use_mle: bool,
model: ThetaModel,
) -> None:
self._b0 = b0
self._alpha = alpha
self._sigma2 = sigma2
self._one_step = one_step
self._nobs = model.endog_orig.shape[0]
self._model = model
self._seasonal = seasonal
self._use_mle = use_mle
@property
def params(self) -> pd.Series:
"""The forecasting model parameters"""
return pd.Series([self._b0, self._alpha], index=["b0", "alpha"])
@property
def sigma2(self) -> float:
"""The estimated residual variance"""
if self._sigma2 is None:
mod = SARIMAX(self.model._y, order=(0, 1, 1), trend="c")
res = mod.fit(disp=False)
self._sigma2 = np.asarray(res.params)[-1]
assert self._sigma2 is not None
return self._sigma2
@property
def model(self) -> ThetaModel:
"""The model used to produce the results"""
return self._model
[docs] def forecast(self, steps: int = 1, theta: float = 2) -> pd.Series:
r"""
Forecast the model for a given theta
Parameters
----------
steps : int
The number of steps ahead to compute the forecast components.
theta : float
The theta value to use when computing the weight to combine
the trend and the SES forecasts.
Returns
-------
Series
A Series containing the forecasts
Notes
-----
The forecast is computed as
.. math::
\hat{X}_{T+h|T} = \frac{\theta-1}{\theta} b_0
\left[h - 1 + \frac{1}{\alpha}
- \frac{(1-\alpha)^T}{\alpha} \right]
+ \tilde{X}_{T+h|T}
where :math:`\tilde{X}_{T+h|T}` is the SES forecast of the endogenous
variable using the parameter :math:`\alpha`. :math:`b_0` is the
slope of a time trend line fitted to X using the terms 0, 1, ..., T-1.
This expression follows from [1]_ and [2]_ when the combination
weights are restricted to be (theta-1)/theta and 1/theta. This nests
the original implementation when theta=2 and the two weights are both
1/2.
References
----------
.. [1] Hyndman, R. J., & Billah, B. (2003). Unmasking the Theta method.
International Journal of Forecasting, 19(2), 287-290.
.. [2] Fioruci, J. A., Pellegrini, T. R., Louzada, F., & Petropoulos,
F. (2015). The optimized theta method. arXiv preprint
arXiv:1503.03529.
"""
steps = int_like(steps, "steps")
if steps < 1:
raise ValueError("steps must be a positive integer")
theta = float_like(theta, "theta")
if theta < 1:
raise ValueError("theta must be a float >= 1")
thresh = 4.0 / np.finfo(np.double).eps
trend_weight = (theta - 1) / theta if theta < thresh else 1.0
comp = self.forecast_components(steps=steps)
fcast = trend_weight * comp.trend + np.asarray(comp.ses)
# Re-seasonalize if needed
if self.model.deseasonalize:
seasonal = np.asarray(comp.seasonal)
if self.model.method.startswith("mul"):
fcast *= seasonal
else:
fcast += seasonal
fcast.name = "forecast"
return fcast
[docs] def forecast_components(self, steps: int = 1) -> pd.DataFrame:
r"""
Compute the three components of the Theta model forecast
Parameters
----------
steps : int
The number of steps ahead to compute the forecast components.
Returns
-------
DataFrame
A DataFrame with three columns: trend, ses and seasonal containing
the forecast values of each of the three components.
Notes
-----
For a given value of :math:`\theta`, the deseasonalized forecast is
`fcast = w * trend + ses` where :math:`w = \frac{theta - 1}{theta}`.
The reseasonalized forecasts are then `seasonal * fcast` if the
seasonality is multiplicative or `seasonal + fcast` if the seasonality
is additive.
"""
steps = int_like(steps, "steps")
if steps < 1:
raise ValueError("steps must be a positive integer")
alpha = self._alpha
b0 = self._b0
nobs = self._nobs
h = np.arange(1, steps + 1, dtype=np.float64) - 1
if alpha > 0:
h += 1 / alpha - ((1 - alpha) ** nobs / alpha)
trend = b0 * h
ses = self._one_step * np.ones(steps)
if self.model.method.startswith("add"):
season = np.zeros(steps)
else:
season = np.ones(steps)
# Re-seasonalize
if self.model.deseasonalize:
seasonal = self._seasonal
period = self.model.period
oos_idx = nobs + np.arange(1, steps + 1)
seasonal_locs = oos_idx % period
if seasonal.shape[0]:
season[:] = seasonal[seasonal_locs]
index = getattr(self.model.endog_orig, "index", None)
if index is None:
index = pd.RangeIndex(0, self.model.endog_orig.shape[0])
index = extend_index(steps, index)
df = pd.DataFrame(
{"trend": trend, "ses": ses, "seasonal": season}, index=index
)
return df
[docs] def summary(self) -> Summary:
"""
Summarize the model
Returns
-------
Summary
This holds the summary table and text, which can be printed or
converted to various output formats.
See Also
--------
statsmodels.iolib.summary.Summary
"""
model = self.model
smry = Summary()
model_name = type(model).__name__
title = model_name + " Results"
method = "MLE" if self._use_mle else "OLS/SES"
is_series = isinstance(model.endog_orig, pd.Series)
index = getattr(model.endog_orig, "index", None)
if is_series and isinstance(index, (pd.DatetimeIndex, pd.PeriodIndex)):
sample = [index[0].strftime("%m-%d-%Y")]
sample += ["- " + index[-1].strftime("%m-%d-%Y")]
else:
sample = [str(0), str(model.endog_orig.shape[0])]
dep_name = getattr(model.endog_orig, "name", "endog") or "endog"
top_left = [
("Dep. Variable:", [dep_name]),
("Method:", [method]),
("Date:", None),
("Time:", None),
("Sample:", [sample[0]]),
("", [sample[1]]),
]
method = (
"Multiplicative" if model.method.startswith("mul") else "Additive"
)
top_right = [
("No. Observations:", [str(self._nobs)]),
("Deseasonalized:", [str(model.deseasonalize)]),
]
if model.deseasonalize:
top_right.extend(
[
("Deseas. Method:", [method]),
("Period:", [str(model.period)]),
("", [""]),
("", [""]),
]
)
else:
top_right.extend([("", [""])] * 4)
smry.add_table_2cols(
self, gleft=top_left, gright=top_right, title=title
)
table_fmt = {"data_fmts": ["%s", "%#0.4g"], "data_aligns": "r"}
data = np.asarray(self.params)[:, None]
st = SimpleTable(
data,
["Parameters", "Estimate"],
list(self.params.index),
title="Parameter Estimates",
txt_fmt=table_fmt,
)
smry.tables.append(st)
return smry
[docs] def prediction_intervals(
self, steps: int = 1, theta: float = 2, alpha: float = 0.05
) -> pd.DataFrame:
r"""
Parameters
----------
steps : int, default 1
The number of steps ahead to compute the forecast components.
theta : float, default 2
The theta value to use when computing the weight to combine
the trend and the SES forecasts.
alpha : float, default 0.05
Significance level for the confidence intervals.
Returns
-------
DataFrame
DataFrame with columns lower and upper
Notes
-----
The variance of the h-step forecast is assumed to follow from the
integrated Moving Average structure of the Theta model, and so is
:math:`\sigma^2(1 + (h-1)(1 + (\alpha-1)^2)`. The prediction interval
assumes that innovations are normally distributed.
"""
model_alpha = self.params[1]
sigma2_h = (
1 + np.arange(steps) * (1 + (model_alpha - 1) ** 2)
) * self.sigma2
sigma_h = np.sqrt(sigma2_h)
quantile = stats.norm.ppf(alpha / 2)
predictions = self.forecast(steps, theta)
return pd.DataFrame(
{
"lower": predictions + sigma_h * quantile,
"upper": predictions + sigma_h * -quantile,
}
)
[docs] def plot_predict(
self,
steps: int = 1,
theta: float = 2,
alpha: Optional[float] = 0.05,
in_sample: bool = False,
fig: Optional["matplotlib.figure.Figure"] = None,
figsize: Tuple[float, float] = None,
) -> "matplotlib.figure.Figure":
r"""
Plot forecasts, prediction intervals and in-sample values
Parameters
----------
steps : int, default 1
The number of steps ahead to compute the forecast components.
theta : float, default 2
The theta value to use when computing the weight to combine
the trend and the SES forecasts.
alpha : {float, None}, default 0.05
The tail probability not covered by the confidence interval. Must
be in (0, 1). Confidence interval is constructed assuming normally
distributed shocks. If None, figure will not show the confidence
interval.
in_sample : bool, default False
Flag indicating whether to include the in-sample period in the
plot.
fig : Figure, default None
An existing figure handle. If not provided, a new figure is
created.
figsize: tuple[float, float], default None
Tuple containing the figure size.
Returns
-------
Figure
Figure handle containing the plot.
Notes
-----
The variance of the h-step forecast is assumed to follow from the
integrated Moving Average structure of the Theta model, and so is
:math:`\sigma^2(\alpha^2 + (h-1))`. The prediction interval assumes
that innovations are normally distributed.
"""
from statsmodels.graphics.utils import _import_mpl, create_mpl_fig
_import_mpl()
fig = create_mpl_fig(fig, figsize)
assert fig is not None
predictions = self.forecast(steps, theta)
pred_index = predictions.index
ax = fig.add_subplot(111)
nobs = self.model.endog_orig.shape[0]
index = pd.Int64Index(np.arange(nobs))
if in_sample:
if isinstance(self.model.endog_orig, pd.Series):
index = self.model.endog_orig.index
ax.plot(index, self.model.endog_orig)
ax.plot(pred_index, predictions)
if alpha is not None:
pi = self.prediction_intervals(steps, theta, alpha)
label = "{0:.0%} confidence interval".format(1 - alpha)
ax.fill_between(
pred_index,
pi["lower"],
pi["upper"],
color="gray",
alpha=0.5,
label=label,
)
ax.legend(loc="best", frameon=False)
fig.tight_layout(pad=1.0)
return fig