"""
Linear exponential smoothing models
Author: Chad Fulton
License: BSD-3
"""
import numpy as np
import pandas as pd
from statsmodels.base.data import PandasData
from statsmodels.genmod.generalized_linear_model import GLM
from statsmodels.tools.validation import (array_like, bool_like, float_like,
string_like, int_like)
from statsmodels.tsa.exponential_smoothing import initialization as es_init
from statsmodels.tsa.statespace import initialization as ss_init
from statsmodels.tsa.statespace.kalman_filter import (
MEMORY_CONSERVE, MEMORY_NO_FORECAST)
from statsmodels.compat.pandas import Appender
import statsmodels.base.wrapper as wrap
from statsmodels.iolib.summary import forg
from statsmodels.iolib.table import SimpleTable
from statsmodels.iolib.tableformatting import fmt_params
from .mlemodel import MLEModel, MLEResults, MLEResultsWrapper
class ExponentialSmoothing(MLEModel):
"""
Linear exponential smoothing models
Parameters
----------
endog : array_like
The observed time-series process :math:`y`
trend : bool, optional
Whether or not to include a trend component. Default is False.
damped_trend : bool, optional
Whether or not an included trend component is damped. Default is False.
seasonal : int, optional
The number of periods in a complete seasonal cycle for seasonal
(Holt-Winters) models. For example, 4 for quarterly data with an
annual cycle or 7 for daily data with a weekly cycle. Default is
no seasonal effects.
initialization_method : str, optional
Method for initialize the recursions. One of:
* 'estimated'
* 'concentrated'
* 'heuristic'
* 'known'
If 'known' initialization is used, then `initial_level` must be
passed, as well as `initial_slope` and `initial_seasonal` if
applicable. Default is 'estimated'.
initial_level : float, optional
The initial level component. Only used if initialization is 'known'.
initial_trend : float, optional
The initial trend component. Only used if initialization is 'known'.
initial_seasonal : array_like, optional
The initial seasonal component. An array of length `seasonal`
or length `seasonal - 1` (in which case the last initial value
is computed to make the average effect zero). Only used if
initialization is 'known'.
bounds : iterable[tuple], optional
An iterable containing bounds for the parameters. Must contain four
elements, where each element is a tuple of the form (lower, upper).
Default is (0.0001, 0.9999) for the level, trend, and seasonal
smoothing parameters and (0.8, 0.98) for the trend damping parameter.
concentrate_scale : bool, optional
Whether or not to concentrate the scale (variance of the error term)
out of the likelihood.
Notes
-----
The parameters and states of this model are estimated by setting up the
exponential smoothing equations as a special case of a linear Gaussian
state space model and applying the Kalman filter. As such, it has slightly
worse performance than the dedicated exponential smoothing model,
:class:`statsmodels.tsa.holtwinters.ExponentialSmoothing`, and it does not
support multiplicative (nonlinear) exponential smoothing models.
However, as a subclass of the state space models, this model class shares
a consistent set of functionality with those models, which can make it
easier to work with. In addition, it supports computing confidence
intervals for forecasts and it supports concentrating the initial
state out of the likelihood function.
References
----------
[1] Hyndman, Rob, Anne B. Koehler, J. Keith Ord, and Ralph D. Snyder.
Forecasting with exponential smoothing: the state space approach.
Springer Science & Business Media, 2008.
"""
def __init__(self, endog, trend=False, damped_trend=False, seasonal=None,
initialization_method='estimated', initial_level=None,
initial_trend=None, initial_seasonal=None, bounds=None,
concentrate_scale=True, dates=None, freq=None,
missing='none'):
# Model definition
self.trend = bool_like(trend, 'trend')
self.damped_trend = bool_like(damped_trend, 'damped_trend')
self.seasonal_periods = int_like(seasonal, 'seasonal', optional=True)
self.seasonal = self.seasonal_periods is not None
self.initialization_method = string_like(
initialization_method, 'initialization_method').lower()
self.concentrate_scale = bool_like(concentrate_scale,
'concentrate_scale')
# TODO: add validation for bounds (e.g. have all bounds, upper > lower)
# TODO: add `bounds_method` argument to choose between "usual" and
# "admissible" as in Hyndman et al. (2008)
self.bounds = bounds
if self.bounds is None:
self.bounds = [(1e-4, 1-1e-4)] * 3 + [(0.8, 0.98)]
# Validation
if self.seasonal_periods == 1:
raise ValueError('Cannot have a seasonal period of 1.')
if self.seasonal and self.seasonal_periods is None:
raise NotImplementedError('Unable to detect season automatically;'
' please specify `seasonal_periods`.')
if self.initialization_method not in ['concentrated', 'estimated',
'simple', 'heuristic', 'known']:
raise ValueError('Invalid initialization method "%s".'
% initialization_method)
if self.initialization_method == 'known':
if initial_level is None:
raise ValueError('`initial_level` argument must be provided'
' when initialization method is set to'
' "known".')
if initial_trend is None and self.trend:
raise ValueError('`initial_trend` argument must be provided'
' for models with a trend component when'
' initialization method is set to "known".')
if initial_seasonal is None and self.seasonal:
raise ValueError('`initial_seasonal` argument must be provided'
' for models with a seasonal component when'
' initialization method is set to "known".')
# Initialize the state space model
if not self.seasonal or self.seasonal_periods is None:
self._seasonal_periods = 0
else:
self._seasonal_periods = self.seasonal_periods
k_states = 2 + int(self.trend) + self._seasonal_periods
k_posdef = 1
init = ss_init.Initialization(k_states, 'known',
constant=[0] * k_states)
super(ExponentialSmoothing, self).__init__(
endog, k_states=k_states, k_posdef=k_posdef,
initialization=init, dates=dates, freq=freq, missing=missing)
# Concentrate the scale out of the likelihood function
if self.concentrate_scale:
self.ssm.filter_concentrated = True
# Setup fixed elements of the system matrices
# Observation error
self.ssm['design', 0, 0] = 1.
self.ssm['selection', 0, 0] = 1.
self.ssm['state_cov', 0, 0] = 1.
# Level
self.ssm['design', 0, 1] = 1.
self.ssm['transition', 1, 1] = 1.
# Trend
if self.trend:
self.ssm['transition', 1:3, 2] = 1.
# Seasonal
if self.seasonal:
k = 2 + int(self.trend)
self.ssm['design', 0, k] = 1.
self.ssm['transition', k, -1] = 1.
self.ssm['transition', k + 1:k_states, k:k_states - 1] = (
np.eye(self.seasonal_periods - 1))
# Initialization of the states
if self.initialization_method != 'known':
msg = ('Cannot give `%%s` argument when initialization is "%s"'
% initialization_method)
if initial_level is not None:
raise ValueError(msg % 'initial_level')
if initial_trend is not None:
raise ValueError(msg % 'initial_trend')
if initial_seasonal is not None:
raise ValueError(msg % 'initial_seasonal')
if self.initialization_method == 'simple':
initial_level, initial_trend, initial_seasonal = (
es_init._initialization_simple(
self.endog[:, 0], trend='add' if self.trend else None,
seasonal='add' if self.seasonal else None,
seasonal_periods=self.seasonal_periods))
elif self.initialization_method == 'heuristic':
initial_level, initial_trend, initial_seasonal = (
es_init._initialization_heuristic(
self.endog[:, 0], trend='add' if self.trend else None,
seasonal='add' if self.seasonal else None,
seasonal_periods=self.seasonal_periods))
elif self.initialization_method == 'known':
initial_level = float_like(initial_level, 'initial_level')
if self.trend:
initial_trend = float_like(initial_trend, 'initial_trend')
if self.seasonal:
initial_seasonal = array_like(initial_seasonal,
'initial_seasonal')
if len(initial_seasonal) == self.seasonal_periods - 1:
initial_seasonal = np.r_[initial_seasonal,
0 - np.sum(initial_seasonal)]
if len(initial_seasonal) != self.seasonal_periods:
raise ValueError(
'Invalid length of initial seasonal values. Must be'
' one of s or s-1, where s is the number of seasonal'
' periods.')
self._initial_level = initial_level
self._initial_trend = initial_trend
self._initial_seasonal = initial_seasonal
self._initial_state = None
# Initialize now if possible (if we have a damped trend, then
# initialization will depend on the phi parameter, and so has to be
# done at each `update`)
methods = ['simple', 'heuristic', 'known']
if not self.damped_trend and self.initialization_method in methods:
self._initialize_constant_statespace(initial_level, initial_trend,
initial_seasonal)
# Save keys for kwarg initialization
self._init_keys += ['trend', 'damped_trend', 'seasonal',
'initialization_method', 'initial_level',
'initial_trend', 'initial_seasonal', 'bounds',
'concentrate_scale', 'dates', 'freq', 'missing']
def _get_init_kwds(self):
kwds = super()._get_init_kwds()
kwds['seasonal'] = self.seasonal_periods
return kwds
@property
def _res_classes(self):
return {'fit': (ExponentialSmoothingResults,
ExponentialSmoothingResultsWrapper)}
def clone(self, endog, exog=None, **kwargs):
if exog is not None:
raise NotImplementedError(
'ExponentialSmoothing does not support `exog`.')
return self._clone_from_init_kwds(endog, **kwargs)
@property
def state_names(self):
state_names = ['error', 'level']
if self.trend:
state_names += ['trend']
if self.seasonal:
state_names += ['seasonal.%d' % i
for i in range(self.seasonal_periods)]
return state_names
@property
def param_names(self):
param_names = ['smoothing_level']
if self.trend:
param_names += ['smoothing_trend']
if self.seasonal:
param_names += ['smoothing_seasonal']
if self.damped_trend:
param_names += ['damping_trend']
if not self.concentrate_scale:
param_names += ['sigma2']
# Initialization
if self.initialization_method == 'estimated':
param_names += ['initial_level']
if self.trend:
param_names += ['initial_trend']
if self.seasonal:
param_names += ['initial_seasonal.%d' % i
for i in range(self.seasonal_periods - 1)]
return param_names
@property
def start_params(self):
# Make sure starting parameters aren't beyond or right on the bounds
bounds = [(x[0] + 1e-3, x[1] - 1e-3) for x in self.bounds]
# See Hyndman p.24
start_params = [np.clip(0.1, *bounds[0])]
if self.trend:
start_params += [np.clip(0.01, *bounds[1])]
if self.seasonal:
start_params += [np.clip(0.01, *bounds[2])]
if self.damped_trend:
start_params += [np.clip(0.98, *bounds[3])]
if not self.concentrate_scale:
start_params += [np.var(self.endog)]
# Initialization
if self.initialization_method == 'estimated':
initial_level, initial_trend, initial_seasonal = (
es_init._initialization_simple(
self.endog[:, 0],
trend='add' if self.trend else None,
seasonal='add' if self.seasonal else None,
seasonal_periods=self.seasonal_periods))
start_params += [initial_level]
if self.trend:
start_params += [initial_trend]
if self.seasonal:
start_params += initial_seasonal.tolist()[:-1]
return np.array(start_params)
@property
def k_params(self):
k_params = (
1 + int(self.trend) + int(self.seasonal) +
int(not self.concentrate_scale) + int(self.damped_trend))
if self.initialization_method == 'estimated':
k_params += (
1 + int(self.trend) +
int(self.seasonal) * (self._seasonal_periods - 1))
return k_params
def transform_params(self, unconstrained):
unconstrained = np.array(unconstrained, ndmin=1)
constrained = np.zeros_like(unconstrained)
# Alpha in (0, 1)
low, high = self.bounds[0]
constrained[0] = (
1 / (1 + np.exp(-unconstrained[0])) * (high - low) + low)
i = 1
# Beta in (0, alpha)
if self.trend:
low, high = self.bounds[1]
high = min(high, constrained[0])
constrained[i] = (
1 / (1 + np.exp(-unconstrained[i])) * (high - low) + low)
i += 1
# Gamma in (0, 1 - alpha)
if self.seasonal:
low, high = self.bounds[2]
high = min(high, 1 - constrained[0])
constrained[i] = (
1 / (1 + np.exp(-unconstrained[i])) * (high - low) + low)
i += 1
# Phi in bounds (e.g. default is [0.8, 0.98])
if self.damped_trend:
low, high = self.bounds[3]
constrained[i] = (
1 / (1 + np.exp(-unconstrained[i])) * (high - low) + low)
i += 1
# sigma^2 positive
if not self.concentrate_scale:
constrained[i] = unconstrained[i]**2
i += 1
# Initial parameters are as-is
if self.initialization_method == 'estimated':
constrained[i:] = unconstrained[i:]
return constrained
def untransform_params(self, constrained):
constrained = np.array(constrained, ndmin=1)
unconstrained = np.zeros_like(constrained)
# Alpha in (0, 1)
low, high = self.bounds[0]
tmp = (constrained[0] - low) / (high - low)
unconstrained[0] = np.log(tmp / (1 - tmp))
i = 1
# Beta in (0, alpha)
if self.trend:
low, high = self.bounds[1]
high = min(high, constrained[0])
tmp = (constrained[i] - low) / (high - low)
unconstrained[i] = np.log(tmp / (1 - tmp))
i += 1
# Gamma in (0, 1 - alpha)
if self.seasonal:
low, high = self.bounds[2]
high = min(high, 1 - constrained[0])
tmp = (constrained[i] - low) / (high - low)
unconstrained[i] = np.log(tmp / (1 - tmp))
i += 1
# Phi in bounds (e.g. default is [0.8, 0.98])
if self.damped_trend:
low, high = self.bounds[3]
tmp = (constrained[i] - low) / (high - low)
unconstrained[i] = np.log(tmp / (1 - tmp))
i += 1
# sigma^2 positive
if not self.concentrate_scale:
unconstrained[i] = constrained[i]**0.5
i += 1
# Initial parameters are as-is
if self.initialization_method == 'estimated':
unconstrained[i:] = constrained[i:]
return unconstrained
def _initialize_constant_statespace(self, initial_level,
initial_trend=None,
initial_seasonal=None):
# Note: this should be run after `update` has already put any new
# parameters into the transition matrix, since it uses the transition
# matrix explicitly.
# Due to timing differences, the state space representation integrates
# the trend into the level in the "predicted_state" (only the
# "filtered_state" corresponds to the timing of the exponential
# smoothing models)
# Initial values are interpreted as "filtered" values
constant = np.array([0., initial_level])
if self.trend and initial_trend is not None:
constant = np.r_[constant, initial_trend]
if self.seasonal and initial_seasonal is not None:
constant = np.r_[constant, initial_seasonal]
self._initial_state = constant[1:]
# Apply the prediction step to get to what we need for our Kalman
# filter implementation
constant = np.dot(self.ssm['transition'], constant)
self.initialization.constant = constant
def _initialize_stationary_cov_statespace(self):
R = self.ssm['selection']
Q = self.ssm['state_cov']
self.initialization.stationary_cov = R.dot(Q).dot(R.T)
def update(self, params, transformed=True, includes_fixed=False,
complex_step=False):
params = self.handle_params(params, transformed=transformed,
includes_fixed=includes_fixed)
# State space system matrices
self.ssm['selection', 0, 0] = 1 - params[0]
self.ssm['selection', 1, 0] = params[0]
i = 1
if self.trend:
self.ssm['selection', 2, 0] = params[i]
i += 1
if self.seasonal:
self.ssm['selection', 0, 0] -= params[i]
self.ssm['selection', i + 1, 0] = params[i]
i += 1
if self.damped_trend:
self.ssm['transition', 1:3, 2] = params[i]
i += 1
if not self.concentrate_scale:
self.ssm['state_cov', 0, 0] = params[i]
i += 1
# State initialization
if self.initialization_method == 'estimated':
initial_level = params[i]
i += 1
initial_trend = None
initial_seasonal = None
if self.trend:
initial_trend = params[i]
i += 1
if self.seasonal:
initial_seasonal = params[i: i + self.seasonal_periods - 1]
initial_seasonal = np.r_[initial_seasonal,
0 - np.sum(initial_seasonal)]
self._initialize_constant_statespace(initial_level, initial_trend,
initial_seasonal)
methods = ['simple', 'heuristic', 'known']
if self.damped_trend and self.initialization_method in methods:
self._initialize_constant_statespace(
self._initial_level, self._initial_trend,
self._initial_seasonal)
self._initialize_stationary_cov_statespace()
def _compute_concentrated_states(self, params, *args, **kwargs):
# Apply the usual filter, but keep forecasts
kwargs['conserve_memory'] = MEMORY_CONSERVE & ~MEMORY_NO_FORECAST
super().loglike(params, *args, **kwargs)
# Compute the initial state vector
y_tilde = np.array(self.ssm._kalman_filter.forecast_error[0],
copy=True)
# Need to modify our state space system matrices slightly to get them
# back into the form of the innovations framework of
# De Livera et al. (2011)
T = self['transition', 1:, 1:]
R = self['selection', 1:]
Z = self['design', :, 1:].copy()
i = 1
if self.trend:
Z[0, i] = 1.
i += 1
if self.seasonal:
Z[0, i] = 0.
Z[0, -1] = 1.
# Now compute the regression components as described in
# De Livera et al. (2011), equation (10).
D = T - R.dot(Z)
w = np.zeros((self.nobs, self.k_states - 1), dtype=D.dtype)
w[0] = Z
for i in range(self.nobs - 1):
w[i + 1] = w[i].dot(D)
mod_ols = GLM(y_tilde, w)
# If we have seasonal parameters, constrain them to sum to zero
# (otherwise the initial level gets confounded with the sum of the
# seasonals).
if self.seasonal:
R = np.zeros_like(Z)
R[0, -self.seasonal_periods:] = 1.
q = np.zeros((1, 1))
res_ols = mod_ols.fit_constrained((R, q))
else:
res_ols = mod_ols.fit()
# Separate into individual components
initial_level = res_ols.params[0]
initial_trend = res_ols.params[1] if self.trend else None
initial_seasonal = (
res_ols.params[-self.seasonal_periods:] if self.seasonal else None)
return initial_level, initial_trend, initial_seasonal
@Appender(MLEModel.loglike.__doc__)
def loglike(self, params, *args, **kwargs):
if self.initialization_method == 'concentrated':
self._initialize_constant_statespace(
*self._compute_concentrated_states(params, *args, **kwargs))
llf = self.ssm.loglike()
self.ssm.initialization.constant = np.zeros(self.k_states)
else:
llf = super().loglike(params, *args, **kwargs)
return llf
@Appender(MLEModel.filter.__doc__)
def filter(self, params, cov_type=None, cov_kwds=None,
return_ssm=False, results_class=None,
results_wrapper_class=None, *args, **kwargs):
if self.initialization_method == 'concentrated':
self._initialize_constant_statespace(
*self._compute_concentrated_states(params, *args, **kwargs))
results = super().filter(
params, cov_type=cov_type, cov_kwds=cov_kwds,
return_ssm=return_ssm, results_class=results_class,
results_wrapper_class=results_wrapper_class, *args, **kwargs)
if self.initialization_method == 'concentrated':
self.ssm.initialization.constant = np.zeros(self.k_states)
return results
@Appender(MLEModel.smooth.__doc__)
def smooth(self, params, cov_type=None, cov_kwds=None,
return_ssm=False, results_class=None,
results_wrapper_class=None, *args, **kwargs):
if self.initialization_method == 'concentrated':
self._initialize_constant_statespace(
*self._compute_concentrated_states(params, *args, **kwargs))
results = super().smooth(
params, cov_type=cov_type, cov_kwds=cov_kwds,
return_ssm=return_ssm, results_class=results_class,
results_wrapper_class=results_wrapper_class, *args, **kwargs)
if self.initialization_method == 'concentrated':
self.ssm.initialization.constant = np.zeros(self.k_states)
return results
class ExponentialSmoothingResults(MLEResults):
"""
Results from fitting a linear exponential smoothing model
"""
def __init__(self, model, params, filter_results, cov_type=None,
**kwargs):
super().__init__(model, params, filter_results, cov_type, **kwargs)
# Save the states
self.initial_state = model._initial_state
if isinstance(self.data, PandasData):
index = self.data.row_labels
self.initial_state = pd.DataFrame(
[model._initial_state], columns=model.state_names[1:])
if model._index_dates and model._index_freq is not None:
self.initial_state.index = index.shift(-1)[:1]
[docs] @Appender(MLEResults.summary.__doc__)
def summary(self, alpha=.05, start=None):
specification = ['A']
if self.model.trend and self.model.damped_trend:
specification.append('Ad')
elif self.model.trend:
specification.append('A')
else:
specification.append('N')
if self.model.seasonal:
specification.append('A')
else:
specification.append('N')
model_name = 'ETS(' + ', '.join(specification) + ')'
summary = super(ExponentialSmoothingResults, self).summary(
alpha=alpha, start=start, title='Exponential Smoothing Results',
model_name=model_name)
if self.model.initialization_method != 'estimated':
params = np.array(self.initial_state)
if params.ndim > 1:
params = params[0]
names = self.model.state_names
param_header = ['initialization method: %s'
% self.model.initialization_method]
params_stubs = names
params_data = [[forg(params[i], prec=4)]
for i in range(len(params))]
initial_state_table = SimpleTable(params_data,
param_header,
params_stubs,
txt_fmt=fmt_params)
summary.tables.insert(-1, initial_state_table)
return summary
class ExponentialSmoothingResultsWrapper(MLEResultsWrapper):
_attrs = {}
_wrap_attrs = wrap.union_dicts(MLEResultsWrapper._wrap_attrs,
_attrs)
_methods = {}
_wrap_methods = wrap.union_dicts(MLEResultsWrapper._wrap_methods,
_methods)
wrap.populate_wrapper(ExponentialSmoothingResultsWrapper, # noqa:E305
ExponentialSmoothingResults)