Source code for statsmodels.tsa.stl.mstl

"""
Author: Kishan Manani
License: BSD-3 Clause

An implementation of MSTL [1], an algorithm for time series decomposition when
there are multiple seasonal components.

This implementation has the following differences with the original algorithm:
- Missing data must be handled outside of this class.
- The algorithm proposed in the paper handles a case when there is no
seasonality. This implementation assumes that there is at least one seasonal
component.

[1] K. Bandura, R.J. Hyndman, and C. Bergmeir (2021)
MSTL: A Seasonal-Trend Decomposition Algorithm for Time Series with Multiple
Seasonal Patterns
https://arxiv.org/pdf/2107.13462.pdf
"""
from typing import Optional, Union
from collections.abc import Sequence
import warnings

import numpy as np
import pandas as pd
from scipy.stats import boxcox

from statsmodels.tools.typing import ArrayLike1D
from statsmodels.tsa.stl._stl import STL
from statsmodels.tsa.tsatools import freq_to_period


[docs] class MSTL: """ MSTL(endog, periods=None, windows=None, lmbda=None, iterate=2, stl_kwargs=None) Season-Trend decomposition using LOESS for multiple seasonalities. .. versionadded:: 0.14.0 Parameters ---------- endog : array_like Data to be decomposed. Must be squeezable to 1-d. periods : {int, array_like, None}, optional Periodicity of the seasonal components. If None and endog is a pandas Series or DataFrame, attempts to determine from endog. If endog is a ndarray, periods must be provided. windows : {int, array_like, None}, optional Length of the seasonal smoothers for each corresponding period. Must be an odd integer, and should normally be >= 7 (default). If None then default values determined using 7 + 4 * np.arange(1, n + 1, 1) where n is number of seasonal components. lmbda : {float, str, None}, optional The lambda parameter for the Box-Cox transform to be applied to `endog` prior to decomposition. If None, no transform is applied. If "auto", a value will be estimated that maximizes the log-likelihood function. iterate : int, optional Number of iterations to use to refine the seasonal component. stl_kwargs: dict, optional Arguments to pass to STL. See Also -------- statsmodels.tsa.seasonal.STL References ---------- .. [1] K. Bandura, R.J. Hyndman, and C. Bergmeir (2021) MSTL: A Seasonal-Trend Decomposition Algorithm for Time Series with Multiple Seasonal Patterns. arXiv preprint arXiv:2107.13462. Examples -------- Start by creating a toy dataset with hourly frequency and multiple seasonal components. >>> import numpy as np >>> import matplotlib.pyplot as plt >>> import pandas as pd >>> pd.plotting.register_matplotlib_converters() >>> np.random.seed(0) >>> t = np.arange(1, 1000) >>> trend = 0.0001 * t ** 2 + 100 >>> daily_seasonality = 5 * np.sin(2 * np.pi * t / 24) >>> weekly_seasonality = 10 * np.sin(2 * np.pi * t / (24 * 7)) >>> noise = np.random.randn(len(t)) >>> y = trend + daily_seasonality + weekly_seasonality + noise >>> index = pd.date_range(start='2000-01-01', periods=len(t), freq='h') >>> data = pd.DataFrame(data=y, index=index) Use MSTL to decompose the time series into two seasonal components with periods 24 (daily seasonality) and 24*7 (weekly seasonality). >>> from statsmodels.tsa.seasonal import MSTL >>> res = MSTL(data, periods=(24, 24*7)).fit() >>> res.plot() >>> plt.tight_layout() >>> plt.show() .. plot:: plots/mstl_plot.py """ def __init__( self, endog: ArrayLike1D, *, periods: Optional[Union[int, Sequence[int]]] = None, windows: Optional[Union[int, Sequence[int]]] = None, lmbda: Optional[Union[float, str]] = None, iterate: int = 2, stl_kwargs: Optional[dict[str, Union[int, bool, None]]] = None, ): self.endog = endog self._y = self._to_1d_array(endog) self.nobs = self._y.shape[0] self.lmbda = lmbda self.periods, self.windows = self._process_periods_and_windows( periods, windows ) self.iterate = iterate self._stl_kwargs = self._remove_overloaded_stl_kwargs( stl_kwargs if stl_kwargs else {} )
[docs] def fit(self): """ Estimate a trend component, multiple seasonal components, and a residual component. Returns ------- DecomposeResult Estimation results. """ num_seasons = len(self.periods) iterate = 1 if num_seasons == 1 else self.iterate # Box Cox if self.lmbda == "auto": y, lmbda = boxcox(self._y, lmbda=None) self.est_lmbda = lmbda elif self.lmbda: y = boxcox(self._y, lmbda=self.lmbda) else: y = self._y # Get STL fit params stl_inner_iter = self._stl_kwargs.pop("inner_iter", None) stl_outer_iter = self._stl_kwargs.pop("outer_iter", None) # Iterate over each seasonal component to extract seasonalities seasonal = np.zeros(shape=(num_seasons, self.nobs)) deseas = y for _ in range(iterate): for i in range(num_seasons): deseas = deseas + seasonal[i] res = STL( endog=deseas, period=self.periods[i], seasonal=self.windows[i], **self._stl_kwargs, ).fit(inner_iter=stl_inner_iter, outer_iter=stl_outer_iter) seasonal[i] = res.seasonal deseas = deseas - seasonal[i] seasonal = np.squeeze(seasonal.T) trend = res.trend rw = res.weights resid = deseas - trend # Return pandas if endog is pandas if isinstance(self.endog, (pd.Series, pd.DataFrame)): index = self.endog.index y = pd.Series(y, index=index, name="observed") trend = pd.Series(trend, index=index, name="trend") resid = pd.Series(resid, index=index, name="resid") rw = pd.Series(rw, index=index, name="robust_weight") cols = [f"seasonal_{period}" for period in self.periods] if seasonal.ndim == 1: seasonal = pd.Series(seasonal, index=index, name="seasonal") else: seasonal = pd.DataFrame(seasonal, index=index, columns=cols) # Avoid circular imports from statsmodels.tsa.seasonal import DecomposeResult return DecomposeResult(y, seasonal, trend, resid, rw)
def __str__(self): return ( "MSTL(endog," f" periods={self.periods}," f" windows={self.windows}," f" lmbda={self.lmbda}," f" iterate={self.iterate})" ) def _process_periods_and_windows( self, periods: Union[int, Sequence[int], None], windows: Union[int, Sequence[int], None], ) -> tuple[Sequence[int], Sequence[int]]: periods = self._process_periods(periods) if windows: windows = self._process_windows(windows, num_seasons=len(periods)) periods, windows = self._sort_periods_and_windows(periods, windows) else: windows = self._process_windows(windows, num_seasons=len(periods)) periods = sorted(periods) if len(periods) != len(windows): raise ValueError("Periods and windows must have same length") # Remove long periods from decomposition if any(period >= self.nobs / 2 for period in periods): warnings.warn( "A period(s) is larger than half the length of time series." " Removing these period(s).", UserWarning ) periods = tuple( period for period in periods if period < self.nobs / 2 ) windows = windows[: len(periods)] return periods, windows def _process_periods( self, periods: Union[int, Sequence[int], None] ) -> Sequence[int]: if periods is None: periods = (self._infer_period(),) elif isinstance(periods, int): periods = (periods,) else: pass return periods def _process_windows( self, windows: Union[int, Sequence[int], None], num_seasons: int, ) -> Sequence[int]: if windows is None: windows = self._default_seasonal_windows(num_seasons) elif isinstance(windows, int): windows = (windows,) else: pass return windows def _infer_period(self) -> int: freq = None if isinstance(self.endog, (pd.Series, pd.DataFrame)): freq = getattr(self.endog.index, "inferred_freq", None) if freq is None: raise ValueError("Unable to determine period from endog") period = freq_to_period(freq) return period @staticmethod def _sort_periods_and_windows( periods, windows ) -> tuple[Sequence[int], Sequence[int]]: if len(periods) != len(windows): raise ValueError("Periods and windows must have same length") periods, windows = zip(*sorted(zip(periods, windows))) return periods, windows @staticmethod def _remove_overloaded_stl_kwargs(stl_kwargs: dict) -> dict: args = ["endog", "period", "seasonal"] for arg in args: stl_kwargs.pop(arg, None) return stl_kwargs @staticmethod def _default_seasonal_windows(n: int) -> Sequence[int]: return tuple(7 + 4 * i for i in range(1, n + 1)) # See [1] @staticmethod def _to_1d_array(x): y = np.ascontiguousarray(np.squeeze(np.asarray(x)), dtype=np.double) if y.ndim != 1: raise ValueError("y must be a 1d array") return y

Last update: Oct 03, 2024