Source code for statsmodels.regression.rolling
"""
Rolling OLS and WLS
Implements an efficient rolling estimator that avoids repeated matrix
multiplication.
Copyright (c) 2019 Kevin Sheppard
License: 3-clause BSD
"""
from statsmodels.compat.numpy import lstsq
from statsmodels.compat.pandas import (
Appender,
Substitution,
cache_readonly,
call_cached_func,
get_cached_doc,
)
from collections import namedtuple
import numpy as np
from pandas import DataFrame, MultiIndex, Series
from scipy import stats
from statsmodels.base import model
from statsmodels.base.model import LikelihoodModelResults, Model
from statsmodels.formula._manager import FormulaManager
from statsmodels.regression.linear_model import (
RegressionModel,
RegressionResults,
)
from statsmodels.tools.validation import array_like, int_like, string_like
def strip4(line):
if line.startswith(" "):
return line[4:]
return line
RollingStore = namedtuple(
"RollingStore",
[
"params",
"ssr",
"llf",
"nobs",
"s2",
"xpxi",
"xeex",
"centered_tss",
"uncentered_tss",
],
)
common_params = "\n".join(map(strip4, model._model_params_doc.split("\n")))
window_parameters = """\
window : int
Length of the rolling window. Must be strictly larger than the number
of variables in the model.
"""
weight_parameters = """
weights : array_like, optional
A 1d array of weights. If you supply 1/W then the variables are
pre- multiplied by 1/sqrt(W). If no weights are supplied the
default value is 1 and WLS results are the same as OLS.
"""
_missing_param_doc = """\
min_nobs : {int, None}
Minimum number of observations required to estimate a model when
data are missing. If None, the minimum depends on the number of
regressors in the model. Must be smaller than window.
missing : str, default "drop"
Available options are "drop", "skip" and "raise". If "drop", any
observations with nans are dropped and the estimates are computed using
only the non-missing values in each window. If 'skip' blocks containing
missing values are skipped and the corresponding results contains NaN.
If 'raise', an error is raised. Default is 'drop'.
expanding : bool, default False
If True, then the initial observations after min_nobs are filled using
an expanding scheme until ``window`` observations are available, after
which rolling is used.
"""
extra_base = _missing_param_doc
extra_parameters = window_parameters + weight_parameters + extra_base
_doc = """
Rolling %(model_type)s Least Squares
%(parameters)s
%(extra_parameters)s
See Also
--------
statsmodels.regression.linear_model.%(model)s
%(model)s estimation and parameter testing.
Notes
-----
Tested against %(model)s for accuracy.
Results may differ from %(model)s applied to windows of data if this
model contains an implicit constant (i.e., includes dummies for all
categories) rather than an explicit constant (e.g., a column of 1s).
Examples
--------
>>> from statsmodels.regression.rolling import Rolling%(model)s
>>> from statsmodels.datasets import longley
>>> data = longley.load()
>>> exog = add_constant(data.exog, prepend=False)
>>> mod = Rolling%(model)s(data.endog, exog)
>>> rolling_res = mod.fit(reset=50)
Use params_only to skip all calculations except parameter estimation
>>> rolling_params = mod.fit(params_only=True)
Use expanding and min_nobs to fill the initial results using an
expanding scheme until window observation, and the roll.
>>> mod = Rolling%(model)s(data.endog, exog, window=60, min_nobs=12,
... expanding=True)
>>> rolling_res = mod.fit()
"""
[docs]
@Substitution(
model_type="Weighted",
model="WLS",
parameters=common_params,
extra_parameters=extra_parameters,
)
@Appender(_doc)
class RollingWLS:
def __init__(
self,
endog,
exog,
window=None,
*,
weights=None,
min_nobs=None,
missing="drop",
expanding=False
):
# Call Model.__init__ twice to use const detection in first pass
# But to not drop in the second pass
missing = string_like(
missing, "missing", options=("drop", "raise", "skip")
)
temp_msng = "drop" if missing != "raise" else "raise"
Model.__init__(self, endog, exog, missing=temp_msng, hasconst=None)
k_const = self.k_constant
const_idx = self.data.const_idx
Model.__init__(self, endog, exog, missing="none", hasconst=False)
self.k_constant = k_const
self.data.const_idx = const_idx
self._y = array_like(endog, "endog")
nobs = self._y.shape[0]
self._x = array_like(exog, "endog", ndim=2, shape=(nobs, None))
window = int_like(window, "window", optional=True)
weights = array_like(weights, "weights", optional=True, shape=(nobs,))
self._window = window if window is not None else self._y.shape[0]
self._weighted = weights is not None
self._weights = np.ones(nobs) if weights is None else weights
w12 = np.sqrt(self._weights)
self._wy = w12 * self._y
self._wx = w12[:, None] * self._x
min_nobs = int_like(min_nobs, "min_nobs", optional=True)
self._min_nobs = min_nobs if min_nobs is not None else self._x.shape[1]
if self._min_nobs < self._x.shape[1] or self._min_nobs > self._window:
raise ValueError(
"min_nobs must be larger than the number of "
"regressors in the model and less than window"
)
self._expanding = expanding
self._is_nan = np.zeros_like(self._y, dtype=bool)
self._has_nan = self._find_nans()
self.const_idx = self.data.const_idx
self._skip_missing = missing == "skip"
def _handle_data(self, endog, exog, missing, hasconst, **kwargs):
return Model._handle_data(
self, endog, exog, missing, hasconst, **kwargs
)
def _find_nans(self):
nans = np.isnan(self._y)
nans |= np.any(np.isnan(self._x), axis=1)
nans |= np.isnan(self._weights)
self._is_nan[:] = nans
has_nan = np.cumsum(nans)
w = self._window
has_nan[w - 1 :] = has_nan[w - 1 :] - has_nan[: -(w - 1)]
if self._expanding:
has_nan[: self._min_nobs] = False
else:
has_nan[: w - 1] = False
return has_nan.astype(bool)
def _get_data(self, idx):
window = self._window
if idx >= window:
loc = slice(idx - window, idx)
else:
loc = slice(idx)
y = self._y[loc]
wy = self._wy[loc]
wx = self._wx[loc]
weights = self._weights[loc]
missing = self._is_nan[loc]
not_missing = ~missing
if np.any(missing):
y = y[not_missing]
wy = wy[not_missing]
wx = wx[not_missing]
weights = weights[not_missing]
return y, wy, wx, weights, not_missing
def _fit_single(self, idx, wxpwx, wxpwy, nobs, store, params_only, method):
if nobs < self._min_nobs:
return
try:
if (method == "inv") or not params_only:
wxpwxi = np.linalg.inv(wxpwx)
if method == "inv":
params = wxpwxi @ wxpwy
else:
_, wy, wx, _, _ = self._get_data(idx)
if method == "lstsq":
params = lstsq(wx, wy)[0]
else: # 'pinv'
wxpwxiwxp = np.linalg.pinv(wx)
params = wxpwxiwxp @ wy
except np.linalg.LinAlgError:
return
store.params[idx - 1] = params
if params_only:
return
y, wy, wx, weights, _ = self._get_data(idx)
wresid, ssr, llf = self._loglike(params, wy, wx, weights, nobs)
wxwresid = wx * wresid[:, None]
wxepwxe = wxwresid.T @ wxwresid
tot_params = wx.shape[1]
s2 = ssr / (nobs - tot_params)
centered_tss, uncentered_tss = self._sum_of_squares(y, wy, weights)
store.ssr[idx - 1] = ssr
store.llf[idx - 1] = llf
store.nobs[idx - 1] = nobs
store.s2[idx - 1] = s2
store.xpxi[idx - 1] = wxpwxi
store.xeex[idx - 1] = wxepwxe
store.centered_tss[idx - 1] = centered_tss
store.uncentered_tss[idx - 1] = uncentered_tss
def _loglike(self, params, wy, wx, weights, nobs):
nobs2 = nobs / 2.0
wresid = wy - wx @ params
ssr = np.sum(wresid ** 2, axis=0)
llf = -np.log(ssr) * nobs2 # concentrated likelihood
llf -= (1 + np.log(np.pi / nobs2)) * nobs2 # with constant
llf += 0.5 * np.sum(np.log(weights))
return wresid, ssr, llf
def _sum_of_squares(self, y, wy, weights):
mean = np.average(y, weights=weights)
centered_tss = np.sum(weights * (y - mean) ** 2)
uncentered_tss = np.dot(wy, wy)
return centered_tss, uncentered_tss
def _reset(self, idx):
"""Compute xpx and xpy using a single dot product"""
_, wy, wx, _, not_missing = self._get_data(idx)
nobs = not_missing.sum()
xpx = wx.T @ wx
xpy = wx.T @ wy
return xpx, xpy, nobs
[docs]
def fit(
self,
method="inv",
cov_type="nonrobust",
cov_kwds=None,
reset=None,
use_t=False,
params_only=False,
):
"""
Estimate model parameters.
Parameters
----------
method : {'inv', 'lstsq', 'pinv'}
Method to use when computing the the model parameters.
* 'inv' - use moving windows inner-products and matrix inversion.
This method is the fastest, but may be less accurate than the
other methods.
* 'lstsq' - Use numpy.linalg.lstsq
* 'pinv' - Use numpy.linalg.pinv. This method matches the default
estimator in non-moving regression estimators.
cov_type : {'nonrobust', 'HCCM', 'HC0'}
Covariance estimator:
* nonrobust - The classic OLS covariance estimator
* HCCM, HC0 - White heteroskedasticity robust covariance
cov_kwds : dict
Unused
reset : int, optional
Interval to recompute the moving window inner products used to
estimate the model parameters. Smaller values improve accuracy,
although in practice this setting is not required to be set.
use_t : bool, optional
Flag indicating to use the Student's t distribution when computing
p-values.
params_only : bool, optional
Flag indicating that only parameters should be computed. Avoids
calculating all other statistics or performing inference.
Returns
-------
RollingRegressionResults
Estimation results where all pre-sample values are nan-filled.
"""
method = string_like(
method, "method", options=("inv", "lstsq", "pinv")
)
reset = int_like(reset, "reset", optional=True)
reset = self._y.shape[0] if reset is None else reset
if reset < 1:
raise ValueError("reset must be a positive integer")
nobs, k = self._x.shape
store = RollingStore(
params=np.full((nobs, k), np.nan),
ssr=np.full(nobs, np.nan),
llf=np.full(nobs, np.nan),
nobs=np.zeros(nobs, dtype=int),
s2=np.full(nobs, np.nan),
xpxi=np.full((nobs, k, k), np.nan),
xeex=np.full((nobs, k, k), np.nan),
centered_tss=np.full(nobs, np.nan),
uncentered_tss=np.full(nobs, np.nan),
)
w = self._window
first = self._min_nobs if self._expanding else w
xpx, xpy, nobs = self._reset(first)
if not (self._has_nan[first - 1] and self._skip_missing):
self._fit_single(first, xpx, xpy, nobs, store, params_only, method)
wx, wy = self._wx, self._wy
for i in range(first + 1, self._x.shape[0] + 1):
if self._has_nan[i - 1] and self._skip_missing:
continue
if i % reset == 0:
xpx, xpy, nobs = self._reset(i)
else:
if not self._is_nan[i - w - 1] and i > w:
remove_x = wx[i - w - 1 : i - w]
xpx -= remove_x.T @ remove_x
xpy -= remove_x.T @ wy[i - w - 1 : i - w]
nobs -= 1
if not self._is_nan[i - 1]:
add_x = wx[i - 1 : i]
xpx += add_x.T @ add_x
xpy += add_x.T @ wy[i - 1 : i]
nobs += 1
self._fit_single(i, xpx, xpy, nobs, store, params_only, method)
return RollingRegressionResults(
self, store, self.k_constant, use_t, cov_type
)
[docs]
@classmethod
@Appender(Model.from_formula.__doc__)
def from_formula(
cls, formula, data, window, weights=None, subset=None, *args, **kwargs
):
if subset is not None:
data = data.loc[subset]
eval_env = kwargs.pop("eval_env", None)
if eval_env is None:
eval_env = 2
elif eval_env == -1:
mgr = FormulaManager()
eval_env = mgr.get_empty_eval_env()
else:
eval_env += 1 # we're going down the stack again
missing = kwargs.get("missing", "skip")
na_action = FormulaManager().get_na_action(action="raise", types=[])
mgr = FormulaManager()
result = mgr.get_matrices(
formula,
data,
eval_env=eval_env,
pandas=True,
na_action=na_action,
)
endog, exog = result
if (endog.ndim > 1 and endog.shape[1] > 1) or endog.ndim > 2:
raise ValueError(
"endog has evaluated to an array with multiple "
"columns that has shape {}. This occurs when "
"the variable converted to endog is non-numeric"
" (e.g., bool or str).".format(endog.shape)
)
kwargs.update({"missing": missing, "window": window})
if weights is not None:
kwargs["weights"] = weights
mod = cls(endog, exog, *args, **kwargs)
mod.formula = formula
# since we got a dataframe, attach the original
mod.data.frame = data
return mod
extra_parameters = window_parameters + extra_base
[docs]
@Substitution(
model_type="Ordinary",
model="OLS",
parameters=common_params,
extra_parameters=extra_parameters,
)
@Appender(_doc)
class RollingOLS(RollingWLS):
def __init__(
self,
endog,
exog,
window=None,
*,
min_nobs=None,
missing="drop",
expanding=False
):
super().__init__(
endog,
exog,
window,
weights=None,
min_nobs=min_nobs,
missing=missing,
expanding=expanding,
)
[docs]
class RollingRegressionResults:
"""
Results from rolling regressions
Parameters
----------
model : RollingWLS
Model instance
store : RollingStore
Container for raw moving window results
k_constant : bool
Flag indicating that the model contains a constant
use_t : bool
Flag indicating to use the Student's t distribution when computing
p-values.
cov_type : str
Name of covariance estimator
"""
_data_in_cache = tuple()
def __init__(
self, model, store: RollingStore, k_constant, use_t, cov_type
):
self.model = model
self._params = store.params
self._ssr = store.ssr
self._llf = store.llf
self._nobs = store.nobs
self._s2 = store.s2
self._xpxi = store.xpxi
self._xepxe = store.xeex
self._centered_tss = store.centered_tss
self._uncentered_tss = store.uncentered_tss
self._k_constant = k_constant
self._nvar = self._xpxi.shape[-1]
if use_t is None:
use_t = cov_type == "nonrobust"
self._use_t = use_t
self._cov_type = cov_type
self._use_pandas = self.model.data.row_labels is not None
self._data_attr = []
self._cache = {}
def _wrap(self, val):
"""Wrap output as pandas Series or DataFrames as needed"""
if not self._use_pandas:
return val
col_names = self.model.data.param_names
row_names = self.model.data.row_labels
if val.ndim == 1:
return Series(val, index=row_names)
if val.ndim == 2:
return DataFrame(val, columns=col_names, index=row_names)
else: # ndim == 3
mi = MultiIndex.from_product((row_names, col_names))
val = np.reshape(val, (-1, val.shape[-1]))
return DataFrame(val, columns=col_names, index=mi)
@cache_readonly
@Appender(get_cached_doc(RegressionResults.aic))
def aic(self):
return self._wrap(call_cached_func(RegressionResults.aic, self))
@cache_readonly
@Appender(get_cached_doc(RegressionResults.bic))
def bic(self):
with np.errstate(divide="ignore"):
return self._wrap(call_cached_func(RegressionResults.bic, self))
[docs]
def info_criteria(self, crit, dk_params=0):
return self._wrap(
RegressionResults.info_criteria(self, crit, dk_params=dk_params)
)
@cache_readonly
def params(self):
"""Estimated model parameters"""
return self._wrap(self._params)
@cache_readonly
@Appender(get_cached_doc(RegressionResults.ssr))
def ssr(self):
return self._wrap(self._ssr)
@cache_readonly
@Appender(get_cached_doc(RegressionResults.llf))
def llf(self):
return self._wrap(self._llf)
@cache_readonly
@Appender(RegressionModel.df_model.__doc__)
def df_model(self):
return self._nvar - self._k_constant
@cache_readonly
def k_constant(self):
"""Flag indicating whether the model contains a constant"""
return self._k_constant
@cache_readonly
@Appender(get_cached_doc(RegressionResults.centered_tss))
def centered_tss(self):
return self._centered_tss
@cache_readonly
@Appender(get_cached_doc(RegressionResults.uncentered_tss))
def uncentered_tss(self):
return self._uncentered_tss
@cache_readonly
@Appender(get_cached_doc(RegressionResults.rsquared))
def rsquared(self):
return self._wrap(call_cached_func(RegressionResults.rsquared, self))
@cache_readonly
@Appender(get_cached_doc(RegressionResults.rsquared_adj))
def rsquared_adj(self):
return self._wrap(
call_cached_func(RegressionResults.rsquared_adj, self)
)
@cache_readonly
@Appender(get_cached_doc(RegressionResults.nobs))
def nobs(self):
return self._wrap(self._nobs)
@cache_readonly
@Appender(RegressionModel.df_resid.__doc__)
def df_resid(self):
return self._wrap(self._nobs - self.df_model - self._k_constant)
@cache_readonly
@Appender(RegressionResults.use_t.__doc__)
def use_t(self):
return self._use_t
@cache_readonly
@Appender(get_cached_doc(RegressionResults.ess))
def ess(self):
return self._wrap(call_cached_func(RegressionResults.ess, self))
@cache_readonly
@Appender(get_cached_doc(RegressionResults.mse_model))
def mse_model(self):
return self._wrap(call_cached_func(RegressionResults.mse_model, self))
@cache_readonly
@Appender(get_cached_doc(RegressionResults.mse_resid))
def mse_resid(self):
return self._wrap(call_cached_func(RegressionResults.mse_resid, self))
@cache_readonly
@Appender(get_cached_doc(RegressionResults.mse_total))
def mse_total(self):
return self._wrap(call_cached_func(RegressionResults.mse_total, self))
@cache_readonly
def _cov_params(self):
if self._cov_type == "nonrobust":
return self._s2[:, None, None] * self._xpxi
else:
return self._xpxi @ self._xepxe @ self._xpxi
[docs]
def cov_params(self):
"""
Estimated parameter covariance
Returns
-------
array_like
The estimated model covariances. If the original input is a numpy
array, the returned covariance is a 3-d array with shape
(nobs, nvar, nvar). If the original inputs are pandas types, then
the returned covariance is a DataFrame with a MultiIndex with
key (observation, variable), so that the covariance for
observation with index i is cov.loc[i].
"""
return self._wrap(self._cov_params)
@cache_readonly
@Appender(get_cached_doc(RegressionResults.f_pvalue))
def f_pvalue(self):
with np.errstate(invalid="ignore"):
return self._wrap(
call_cached_func(RegressionResults.f_pvalue, self)
)
@cache_readonly
@Appender(get_cached_doc(RegressionResults.fvalue))
def fvalue(self):
if self._cov_type == "nonrobust":
return self.mse_model / self.mse_resid
else:
nobs = self._params.shape[0]
stat = np.full(nobs, np.nan)
k = self._params.shape[1]
r = np.eye(k)
locs = list(range(k))
if self.k_constant:
locs.pop(self.model.const_idx)
if not locs:
return stat
r = r[locs]
vcv = self._cov_params
rvcvr = r @ vcv @ r.T
p = self._params
for i in range(nobs):
rp = p[i : i + 1] @ r.T
denom = rp.shape[1]
inv_cov = np.linalg.inv(rvcvr[i])
stat[i] = np.squeeze(rp @ inv_cov @ rp.T) / denom
return stat
@cache_readonly
@Appender(get_cached_doc(RegressionResults.bse))
def bse(self):
with np.errstate(invalid="ignore"):
return self._wrap(np.sqrt(np.diagonal(self._cov_params, 0, 2)))
@cache_readonly
@Appender(get_cached_doc(LikelihoodModelResults.tvalues))
def tvalues(self):
with np.errstate(invalid="ignore"):
return self._wrap(
call_cached_func(LikelihoodModelResults.tvalues, self)
)
@cache_readonly
@Appender(get_cached_doc(LikelihoodModelResults.pvalues))
def pvalues(self):
if self.use_t:
df_resid = getattr(self, "df_resid_inference", self.df_resid)
df_resid = np.asarray(df_resid)[:, None]
with np.errstate(invalid="ignore"):
return stats.t.sf(np.abs(self.tvalues), df_resid) * 2
else:
with np.errstate(invalid="ignore"):
return stats.norm.sf(np.abs(self.tvalues)) * 2
def _conf_int(self, alpha, cols):
bse = np.asarray(self.bse)
if self.use_t:
dist = stats.t
df_resid = getattr(self, "df_resid_inference", self.df_resid)
df_resid = np.asarray(df_resid)[:, None]
q = dist.ppf(1 - alpha / 2, df_resid)
else:
dist = stats.norm
q = dist.ppf(1 - alpha / 2)
params = np.asarray(self.params)
lower = params - q * bse
upper = params + q * bse
if cols is not None:
cols = np.asarray(cols)
lower = lower[:, cols]
upper = upper[:, cols]
return np.asarray(list(zip(lower, upper)))
[docs]
@Appender(LikelihoodModelResults.conf_int.__doc__)
def conf_int(self, alpha=0.05, cols=None):
ci = self._conf_int(alpha, cols)
if not self._use_pandas:
return ci
ci_names = ("lower", "upper")
row_names = self.model.data.row_labels
col_names = self.model.data.param_names
if cols is not None:
col_names = [col_names[i] for i in cols]
mi = MultiIndex.from_product((col_names, ci_names))
ci = np.reshape(np.swapaxes(ci, 1, 2), (ci.shape[0], -1))
return DataFrame(ci, columns=mi, index=row_names)
@property
def cov_type(self):
"""Name of covariance estimator"""
return self._cov_type
[docs]
@classmethod
@Appender(LikelihoodModelResults.load.__doc__)
def load(cls, fname):
return LikelihoodModelResults.load(fname)
remove_data = LikelihoodModelResults.remove_data
[docs]
@Appender(LikelihoodModelResults.save.__doc__)
def save(self, fname, remove_data=False):
return LikelihoodModelResults.save(self, fname, remove_data)
[docs]
def plot_recursive_coefficient(
self,
variables=None,
alpha=0.05,
legend_loc="upper left",
fig=None,
figsize=None,
):
r"""
Plot the recursively estimated coefficients on a given variable
Parameters
----------
variables : {int, str, Iterable[int], Iterable[str], None}, optional
Integer index or string name of the variables whose coefficients
to plot. Can also be an iterable of integers or strings. Default
plots all coefficients.
alpha : float, optional
The confidence intervals for the coefficient are (1 - alpha)%. Set
to None to exclude confidence intervals.
legend_loc : str, optional
The location of the legend in the plot. Default is upper left.
fig : Figure, optional
If given, subplots are created in this figure instead of in a new
figure. Note that the grid will be created in the provided
figure using `fig.add_subplot()`.
figsize : tuple, optional
If a figure is created, this argument allows specifying a size.
The tuple is (width, height).
Returns
-------
Figure
The matplotlib Figure object.
"""
from statsmodels.graphics.utils import _import_mpl, create_mpl_fig
if alpha is not None:
ci = self._conf_int(alpha, None)
row_labels = self.model.data.row_labels
if row_labels is None:
row_labels = np.arange(self._params.shape[0])
k_variables = self._params.shape[1]
param_names = self.model.data.param_names
if variables is None:
variable_idx = list(range(k_variables))
else:
if isinstance(variables, (int, str)):
variables = [variables]
variable_idx = []
for i in range(len(variables)):
variable = variables[i]
if variable in param_names:
variable_idx.append(param_names.index(variable))
elif isinstance(variable, int):
variable_idx.append(variable)
else:
msg = (
"variable {} is not an integer and was not found "
"in the list of variable "
"names: {}".format(
variables[i], ", ".join(param_names)
)
)
raise ValueError(msg)
_import_mpl()
fig = create_mpl_fig(fig, figsize)
loc = 0
import pandas as pd
if isinstance(row_labels, pd.PeriodIndex):
row_labels = row_labels.to_timestamp()
row_labels = np.asarray(row_labels)
for i in variable_idx:
ax = fig.add_subplot(len(variable_idx), 1, loc + 1)
params = self._params[:, i]
valid = ~np.isnan(self._params[:, i])
row_lbl = row_labels[valid]
ax.plot(row_lbl, params[valid])
if alpha is not None:
this_ci = np.reshape(ci[:, :, i], (-1, 2))
if not np.all(np.isnan(this_ci)):
ax.plot(
row_lbl, this_ci[:, 0][valid], "k:", label="Lower CI"
)
ax.plot(
row_lbl, this_ci[:, 1][valid], "k:", label="Upper CI"
)
if loc == 0:
ax.legend(loc=legend_loc)
ax.set_xlim(row_lbl[0], row_lbl[-1])
ax.set_title(param_names[i])
loc += 1
fig.tight_layout()
return fig
Last update:
Jan 27, 2025