# pylint: disable=W0201
import numpy as np
import pandas as pd
from statsmodels.compat.python import iteritems, string_types, range
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.decorators import cache_readonly
from statsmodels.tools.tools import Bunch
from statsmodels.tsa.vector_ar import plotting
from statsmodels.tsa.vector_ar import util
from statsmodels.tsa.vector_ar import var_model as _model
import warnings
FULL_SAMPLE = 0
ROLLING = 1
EXPANDING = 2
def _window_ols(y, x, window=None, window_type=None, min_periods=None):
"""
Minimal replacement for pandas ols that provides the required features
Parameters
----------
y : pd.Series
Endogenous variable
x : pd.DataFrame
Exogenous variables, always adds a constant
window: {None, int}
window_type : {str, int}
min_periods : {None, int}
Returns
-------
results : Bunch
Bunch containing parameters (beta), R-squared (r2), nobs and
residuals (resid)
"""
# Must return beta, r2, resid, nobs
if window_type == FULL_SAMPLE:
window_type = 'full_sample'
elif window_type == ROLLING:
window_type = 'rolling'
elif window_type == EXPANDING:
window_type = 'expanding'
if window_type in ('rolling', 'expanding') and window is None:
window = y.shape[0]
min_periods = 1 if min_periods is None else min_periods
window_type = 'full_sample' if window is None else window_type
window_type = 'rolling' if window_type is None else window_type
if window_type == 'rolling':
min_periods = window
if window_type not in ('full_sample', 'rolling', 'expanding'):
raise ValueError('Unknown window_type')
x = x.copy()
x['intercept'] = 1.0
bunch = Bunch()
if window_type == 'full_sample':
missing = y.isnull() | x.isnull().any(1)
y = y.loc[~missing]
x = x.loc[~missing]
res = OLS(y, x).fit()
bunch['beta'] = res.params
bunch['r2'] = res.rsquared
bunch['nobs'] = res.nobs
bunch['resid'] = res.resid
return bunch
index = y.index
columns = x.columns
n = y.shape[0]
k = x.shape[1]
beta = pd.DataFrame(np.zeros((n, k)),
columns=columns,
index=index)
r2 = pd.Series(np.zeros(n), index=index)
nobs = r2.copy().astype(np.int)
resid = r2.copy()
valid = r2.copy().astype(np.bool)
if window_type == 'rolling':
start = window
else:
start = min_periods
for i in range(start, y.shape[0] + 1):
# i is right edge, as in y[:i] for expanding
if window_type == 'rolling':
left = max(0, i - window)
sel = slice(left, i)
else:
sel = slice(i)
_y = y[sel]
_x = x[sel]
missing = _y.isnull() | _x.isnull().any(1)
if missing.any():
if (~missing).sum() < min_periods:
continue
else:
_y = _y.loc[~missing]
_x = _x.loc[~missing]
if _y.shape[0] <= _x.shape[1]:
continue
if window_type == 'expanding' and missing.values[-1]:
continue
res = OLS(_y, _x).fit()
valid.iloc[i - 1] = True
beta.iloc[i - 1] = res.params
r2.iloc[i - 1] = res.rsquared
nobs.iloc[i - 1] = int(res.nobs)
resid.iloc[i - 1] = res.resid.iloc[-1]
bunch['beta'] = beta.loc[valid]
bunch['r2'] = r2.loc[valid]
bunch['nobs'] = nobs.loc[valid]
bunch['resid'] = resid.loc[valid]
return bunch
def _get_window_type(window_type):
if window_type in (FULL_SAMPLE, ROLLING, EXPANDING):
return window_type
elif isinstance(window_type, string_types):
window_type_up = window_type.upper()
if window_type_up in ('FULL SAMPLE', 'FULL_SAMPLE'):
return FULL_SAMPLE
elif window_type_up == 'ROLLING':
return ROLLING
elif window_type_up == 'EXPANDING':
return EXPANDING
raise Exception('Unrecognized window type: %s' % window_type)
[docs]class DynamicVAR(object):
"""
Estimates time-varying vector autoregression (VAR(p)) using
equation-by-equation least squares
Parameters
----------
data : pandas.DataFrame
lag_order : int, default 1
window : int
window_type : {'expanding', 'rolling'}
min_periods : int or None
Minimum number of observations to require in window, defaults to window
size if None specified
trend : {'c', 'nc', 'ct', 'ctt'}
TODO
Returns
-------
**Attributes**:
coefs : Panel
items : coefficient names
major_axis : dates
minor_axis : VAR equation names
"""
def __init__(self, data, lag_order=1, window=None, window_type='expanding',
trend='c', min_periods=None):
self.lag_order = lag_order
self.names = list(data.columns)
self.neqs = len(self.names)
self._y_orig = data
# TODO: deal with trend
self._x_orig = _make_lag_matrix(data, lag_order)
self._x_orig['intercept'] = 1
(self.y, self.x, self.x_filtered, self._index,
self._time_has_obs) = _filter_data(self._y_orig, self._x_orig)
self.lag_order = lag_order
self.trendorder = util.get_trendorder(trend)
self._set_window(window_type, window, min_periods)
warnings.warn('DynamicVAR is depricated and will be removed in a future version, use VAR or VARMAX.', DeprecationWarning)
def _set_window(self, window_type, window, min_periods):
self._window_type = _get_window_type(window_type)
if self._is_rolling:
if window is None:
raise Exception('Must pass window when doing rolling '
'regression')
if min_periods is None:
min_periods = window
else:
window = len(self.x)
if min_periods is None:
min_periods = 1
self._window = int(window)
self._min_periods = min_periods
[docs] @cache_readonly
def T(self):
"""
Number of time periods in results
"""
return len(self.result_index)
@property
def nobs(self):
# Stub, do I need this?
data = dict((eq, r.nobs) for eq, r in iteritems(self.equations))
return pd.DataFrame(data)
[docs] @cache_readonly
def equations(self):
eqs = {}
for col, ts in iteritems(self.y):
model = _window_ols(y=ts, x=self.x, window=self._window,
window_type=self._window_type,
min_periods=self._min_periods)
eqs[col] = model
return eqs
[docs] @cache_readonly
def coefs(self):
"""
Return dynamic regression coefficients as Panel
"""
data = {}
for eq, result in iteritems(self.equations):
data[eq] = result.beta
panel = pd.Panel.fromDict(data)
# Coefficient names become items
return panel.swapaxes('items', 'minor')
@property
def result_index(self):
return self.coefs.major_axis
@cache_readonly
def _coefs_raw(self):
"""
Reshape coefficients to be more amenable to dynamic calculations
Returns
-------
coefs : (time_periods x lag_order x neqs x neqs)
"""
coef_panel = self.coefs.copy()
del coef_panel['intercept']
coef_values = coef_panel.swapaxes('items', 'major').values
coef_values = coef_values.reshape((len(coef_values),
self.lag_order,
self.neqs, self.neqs))
return coef_values
@cache_readonly
def _intercepts_raw(self):
"""
Similar to _coefs_raw, return intercept values in easy-to-use matrix
form
Returns
-------
intercepts : (T x K)
"""
return self.coefs['intercept'].values
[docs] @cache_readonly
def resid(self):
data = {}
for eq, result in iteritems(self.equations):
data[eq] = result.resid
return pd.DataFrame(data)
[docs] def forecast(self, steps=1):
"""
Produce dynamic forecast
Parameters
----------
steps
Returns
-------
forecasts : pandas.DataFrame
"""
output = np.empty((self.T - steps, self.neqs))
y_values = self.y.values
y_index_map = dict((d, idx) for idx, d in enumerate(self.y.index))
result_index_map = dict((d, idx) for idx, d in enumerate(self.result_index))
coefs = self._coefs_raw
intercepts = self._intercepts_raw
# can only produce this many forecasts
forc_index = self.result_index[steps:]
for i, date in enumerate(forc_index):
# TODO: check that this does the right thing in weird cases...
idx = y_index_map[date] - steps
result_idx = result_index_map[date] - steps
y_slice = y_values[:idx]
forcs = _model.forecast(y_slice, coefs[result_idx],
intercepts[result_idx], steps)
output[i] = forcs[-1]
return pd.DataFrame(output, index=forc_index, columns=self.names)
[docs] def plot_forecast(self, steps=1, figsize=(10, 10)):
"""
Plot h-step ahead forecasts against actual realizations of time
series. Note that forecasts are lined up with their respective
realizations.
Parameters
----------
steps :
"""
import matplotlib.pyplot as plt
fig, axes = plt.subplots(figsize=figsize, nrows=self.neqs,
sharex=True)
forc = self.forecast(steps=steps)
dates = forc.index
y_overlay = self.y.reindex(dates)
for i, col in enumerate(forc.columns):
ax = axes[i]
y_ts = y_overlay[col]
forc_ts = forc[col]
y_handle = ax.plot(dates, y_ts.values, 'k.', ms=2)
forc_handle = ax.plot(dates, forc_ts.values, 'k-')
lines = (y_handle[0], forc_handle[0])
labels = ('Y', 'Forecast')
fig.legend(lines, labels)
fig.autofmt_xdate()
fig.suptitle('Dynamic %d-step forecast' % steps)
# pretty things up a bit
plotting.adjust_subplots(bottom=0.15, left=0.10)
plt.draw_if_interactive()
@property
def _is_rolling(self):
return self._window_type == ROLLING
[docs] @cache_readonly
def r2(self):
"""Returns the r-squared values."""
data = dict((eq, r.r2) for eq, r in iteritems(self.equations))
return pd.DataFrame(data)
class DynamicPanelVAR(DynamicVAR):
"""
Dynamic (time-varying) panel vector autoregression using panel ordinary
least squares
Parameters
----------
"""
def __init__(self, data, lag_order=1, window=None, window_type='expanding',
trend='c', min_periods=None):
self.lag_order = lag_order
self.neqs = len(data.columns)
self._y_orig = data
# TODO: deal with trend
self._x_orig = _make_lag_matrix(data, lag_order)
self._x_orig['intercept'] = 1
(self.y, self.x, self.x_filtered, self._index,
self._time_has_obs) = _filter_data(self._y_orig, self._x_orig)
self.lag_order = lag_order
self.trendorder = util.get_trendorder(trend)
self._set_window(window_type, window, min_periods)
warnings.warn('DynamicPanelVAR is depricated and will be removed in a future version, use VAR or VARMAX.', DeprecationWarning)
def _filter_data(lhs, rhs):
"""
Data filtering routine for dynamic VAR
lhs : DataFrame
original data
rhs : DataFrame
lagged variables
Returns
-------
"""
def _has_all_columns(df):
return np.isfinite(df.values).sum(1) == len(df.columns)
rhs_valid = _has_all_columns(rhs)
if not rhs_valid.all():
pre_filtered_rhs = rhs[rhs_valid]
else:
pre_filtered_rhs = rhs
index = lhs.index.union(rhs.index)
if not index.equals(rhs.index) or not index.equals(lhs.index):
rhs = rhs.reindex(index)
lhs = lhs.reindex(index)
rhs_valid = _has_all_columns(rhs)
lhs_valid = _has_all_columns(lhs)
valid = rhs_valid & lhs_valid
if not valid.all():
filt_index = rhs.index[valid]
filtered_rhs = rhs.reindex(filt_index)
filtered_lhs = lhs.reindex(filt_index)
else:
filtered_rhs, filtered_lhs = rhs, lhs
return filtered_lhs, filtered_rhs, pre_filtered_rhs, index, valid
def _make_lag_matrix(x, lags):
data = {}
columns = []
for i in range(1, 1 + lags):
lagstr = 'L%d.' % i
lag = x.shift(i).rename(columns=lambda c: lagstr + c)
data.update(lag._series)
columns.extend(lag.columns)
return pd.DataFrame(data, columns=columns)
class Equation(object):
"""
Stub, estimate one equation
"""
def __init__(self, y, x):
pass
if __name__ == '__main__':
import pandas.util.testing as ptest
ptest.N = 500
data = ptest.makeTimeDataFrame().cumsum(0)
var = DynamicVAR(data, lag_order=2, window_type='expanding')
var2 = DynamicVAR(data, lag_order=2, window=10,
window_type='rolling')