Source code for statsmodels.tsa.stl.mstl
"""
Author: Kishan Manani
License: BSD-3 Clause
An implementation of MSTL [1], an algorithm for time series decomposition when
there are multiple seasonal components.
This implementation has the following differences with the original algorithm:
- Missing data must be handled outside of this class.
- The algorithm proposed in the paper handles a case when there is no
seasonality. This implementation assumes that there is at least one seasonal
component.
[1] K. Bandura, R.J. Hyndman, and C. Bergmeir (2021)
MSTL: A Seasonal-Trend Decomposition Algorithm for Time Series with Multiple
Seasonal Patterns
https://arxiv.org/pdf/2107.13462.pdf
"""
from typing import Optional, Union
from collections.abc import Sequence
import warnings
import numpy as np
import pandas as pd
from scipy.stats import boxcox
from statsmodels.tools.typing import ArrayLike1D
from statsmodels.tsa.stl._stl import STL
from statsmodels.tsa.tsatools import freq_to_period
[docs]
class MSTL:
"""
MSTL(endog, periods=None, windows=None, lmbda=None, iterate=2,
stl_kwargs=None)
Season-Trend decomposition using LOESS for multiple seasonalities.
.. versionadded:: 0.14.0
Parameters
----------
endog : array_like
Data to be decomposed. Must be squeezable to 1-d.
periods : {int, array_like, None}, optional
Periodicity of the seasonal components. If None and endog is a pandas
Series or DataFrame, attempts to determine from endog. If endog is a
ndarray, periods must be provided.
windows : {int, array_like, None}, optional
Length of the seasonal smoothers for each corresponding period.
Must be an odd integer, and should normally be >= 7 (default). If None
then default values determined using 7 + 4 * np.arange(1, n + 1, 1)
where n is number of seasonal components.
lmbda : {float, str, None}, optional
The lambda parameter for the Box-Cox transform to be applied to `endog`
prior to decomposition. If None, no transform is applied. If "auto", a
value will be estimated that maximizes the log-likelihood function.
iterate : int, optional
Number of iterations to use to refine the seasonal component.
stl_kwargs: dict, optional
Arguments to pass to STL.
See Also
--------
statsmodels.tsa.seasonal.STL
References
----------
.. [1] K. Bandura, R.J. Hyndman, and C. Bergmeir (2021)
MSTL: A Seasonal-Trend Decomposition Algorithm for Time Series with
Multiple Seasonal Patterns. arXiv preprint arXiv:2107.13462.
Examples
--------
Start by creating a toy dataset with hourly frequency and multiple seasonal
components.
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> import pandas as pd
>>> pd.plotting.register_matplotlib_converters()
>>> np.random.seed(0)
>>> t = np.arange(1, 1000)
>>> trend = 0.0001 * t ** 2 + 100
>>> daily_seasonality = 5 * np.sin(2 * np.pi * t / 24)
>>> weekly_seasonality = 10 * np.sin(2 * np.pi * t / (24 * 7))
>>> noise = np.random.randn(len(t))
>>> y = trend + daily_seasonality + weekly_seasonality + noise
>>> index = pd.date_range(start='2000-01-01', periods=len(t), freq='h')
>>> data = pd.DataFrame(data=y, index=index)
Use MSTL to decompose the time series into two seasonal components
with periods 24 (daily seasonality) and 24*7 (weekly seasonality).
>>> from statsmodels.tsa.seasonal import MSTL
>>> res = MSTL(data, periods=(24, 24*7)).fit()
>>> res.plot()
>>> plt.tight_layout()
>>> plt.show()
.. plot:: plots/mstl_plot.py
"""
def __init__(
self,
endog: ArrayLike1D,
*,
periods: Optional[Union[int, Sequence[int]]] = None,
windows: Optional[Union[int, Sequence[int]]] = None,
lmbda: Optional[Union[float, str]] = None,
iterate: int = 2,
stl_kwargs: Optional[dict[str, Union[int, bool, None]]] = None,
):
self.endog = endog
self._y = self._to_1d_array(endog)
self.nobs = self._y.shape[0]
self.lmbda = lmbda
self.periods, self.windows = self._process_periods_and_windows(
periods, windows
)
self.iterate = iterate
self._stl_kwargs = self._remove_overloaded_stl_kwargs(
stl_kwargs if stl_kwargs else {}
)
[docs]
def fit(self):
"""
Estimate a trend component, multiple seasonal components, and a
residual component.
Returns
-------
DecomposeResult
Estimation results.
"""
num_seasons = len(self.periods)
iterate = 1 if num_seasons == 1 else self.iterate
# Box Cox
if self.lmbda == "auto":
y, lmbda = boxcox(self._y, lmbda=None)
self.est_lmbda = lmbda
elif self.lmbda:
y = boxcox(self._y, lmbda=self.lmbda)
else:
y = self._y
# Get STL fit params
stl_inner_iter = self._stl_kwargs.pop("inner_iter", None)
stl_outer_iter = self._stl_kwargs.pop("outer_iter", None)
# Iterate over each seasonal component to extract seasonalities
seasonal = np.zeros(shape=(num_seasons, self.nobs))
deseas = y
for _ in range(iterate):
for i in range(num_seasons):
deseas = deseas + seasonal[i]
res = STL(
endog=deseas,
period=self.periods[i],
seasonal=self.windows[i],
**self._stl_kwargs,
).fit(inner_iter=stl_inner_iter, outer_iter=stl_outer_iter)
seasonal[i] = res.seasonal
deseas = deseas - seasonal[i]
seasonal = np.squeeze(seasonal.T)
trend = res.trend
rw = res.weights
resid = deseas - trend
# Return pandas if endog is pandas
if isinstance(self.endog, (pd.Series, pd.DataFrame)):
index = self.endog.index
y = pd.Series(y, index=index, name="observed")
trend = pd.Series(trend, index=index, name="trend")
resid = pd.Series(resid, index=index, name="resid")
rw = pd.Series(rw, index=index, name="robust_weight")
cols = [f"seasonal_{period}" for period in self.periods]
if seasonal.ndim == 1:
seasonal = pd.Series(seasonal, index=index, name="seasonal")
else:
seasonal = pd.DataFrame(seasonal, index=index, columns=cols)
# Avoid circular imports
from statsmodels.tsa.seasonal import DecomposeResult
return DecomposeResult(y, seasonal, trend, resid, rw)
def __str__(self):
return (
"MSTL(endog,"
f" periods={self.periods},"
f" windows={self.windows},"
f" lmbda={self.lmbda},"
f" iterate={self.iterate})"
)
def _process_periods_and_windows(
self,
periods: Union[int, Sequence[int], None],
windows: Union[int, Sequence[int], None],
) -> tuple[Sequence[int], Sequence[int]]:
periods = self._process_periods(periods)
if windows:
windows = self._process_windows(windows, num_seasons=len(periods))
periods, windows = self._sort_periods_and_windows(periods, windows)
else:
windows = self._process_windows(windows, num_seasons=len(periods))
periods = sorted(periods)
if len(periods) != len(windows):
raise ValueError("Periods and windows must have same length")
# Remove long periods from decomposition
if any(period >= self.nobs / 2 for period in periods):
warnings.warn(
"A period(s) is larger than half the length of time series."
" Removing these period(s).", UserWarning
)
periods = tuple(
period for period in periods if period < self.nobs / 2
)
windows = windows[: len(periods)]
return periods, windows
def _process_periods(
self, periods: Union[int, Sequence[int], None]
) -> Sequence[int]:
if periods is None:
periods = (self._infer_period(),)
elif isinstance(periods, int):
periods = (periods,)
else:
pass
return periods
def _process_windows(
self,
windows: Union[int, Sequence[int], None],
num_seasons: int,
) -> Sequence[int]:
if windows is None:
windows = self._default_seasonal_windows(num_seasons)
elif isinstance(windows, int):
windows = (windows,)
else:
pass
return windows
def _infer_period(self) -> int:
freq = None
if isinstance(self.endog, (pd.Series, pd.DataFrame)):
freq = getattr(self.endog.index, "inferred_freq", None)
if freq is None:
raise ValueError("Unable to determine period from endog")
period = freq_to_period(freq)
return period
@staticmethod
def _sort_periods_and_windows(
periods, windows
) -> tuple[Sequence[int], Sequence[int]]:
if len(periods) != len(windows):
raise ValueError("Periods and windows must have same length")
periods, windows = zip(*sorted(zip(periods, windows)))
return periods, windows
@staticmethod
def _remove_overloaded_stl_kwargs(stl_kwargs: dict) -> dict:
args = ["endog", "period", "seasonal"]
for arg in args:
stl_kwargs.pop(arg, None)
return stl_kwargs
@staticmethod
def _default_seasonal_windows(n: int) -> Sequence[int]:
return tuple(7 + 4 * i for i in range(1, n + 1)) # See [1]
@staticmethod
def _to_1d_array(x):
y = np.ascontiguousarray(np.squeeze(np.asarray(x)), dtype=np.double)
if y.ndim != 1:
raise ValueError("y must be a 1d array")
return y
Last update:
Dec 16, 2024