Source code for statsmodels.tsa.varma_process
""" Helper and filter functions for VAR and VARMA, and basic VAR class
Created on Mon Jan 11 11:04:23 2010
Author: josef-pktd
License: BSD
This is a new version, I did not look at the old version again, but similar
ideas.
not copied/cleaned yet:
* fftn based filtering, creating samples with fft
* Tests: I ran examples but did not convert them to tests
examples look good for parameter estimate and forecast, and filter functions
main TODOs:
* result statistics
* see whether Bayesian dummy observation can be included without changing
the single call to linalg.lstsq
* impulse response function does not treat correlation, see Hamilton and jplv
Extensions
* constraints, Bayesian priors/penalization
* Error Correction Form and Cointegration
* Factor Models Stock-Watson, ???
see also VAR section in Notes.txt
"""
import numpy as np
from scipy import signal
from statsmodels.tsa.tsatools import lagmat
def varfilter(x, a):
'''apply an autoregressive filter to a series x
Warning: I just found out that convolve does not work as I
thought, this likely does not work correctly for
nvars>3
x can be 2d, a can be 1d, 2d, or 3d
Parameters
----------
x : array_like
data array, 1d or 2d, if 2d then observations in rows
a : array_like
autoregressive filter coefficients, ar lag polynomial
see Notes
Returns
-------
y : ndarray, 2d
filtered array, number of columns determined by x and a
Notes
-----
In general form this uses the linear filter ::
y = a(L)x
where
x : nobs, nvars
a : nlags, nvars, npoly
Depending on the shape and dimension of a this uses different
Lag polynomial arrays
case 1 : a is 1d or (nlags,1)
one lag polynomial is applied to all variables (columns of x)
case 2 : a is 2d, (nlags, nvars)
each series is independently filtered with its own
lag polynomial, uses loop over nvar
case 3 : a is 3d, (nlags, nvars, npoly)
the ith column of the output array is given by the linear filter
defined by the 2d array a[:,:,i], i.e. ::
y[:,i] = a(.,.,i)(L) * x
y[t,i] = sum_p sum_j a(p,j,i)*x(t-p,j)
for p = 0,...nlags-1, j = 0,...nvars-1,
for all t >= nlags
Note: maybe convert to axis=1, Not
TODO: initial conditions
'''
x = np.asarray(x)
a = np.asarray(a)
if x.ndim == 1:
x = x[:,None]
if x.ndim > 2:
raise ValueError('x array has to be 1d or 2d')
nvar = x.shape[1]
nlags = a.shape[0]
ntrim = nlags//2
# for x is 2d with ncols >1
if a.ndim == 1:
# case: identical ar filter (lag polynomial)
return signal.convolve(x, a[:,None], mode='valid')
# alternative:
#return signal.lfilter(a,[1],x.astype(float),axis=0)
elif a.ndim == 2:
if min(a.shape) == 1:
# case: identical ar filter (lag polynomial)
return signal.convolve(x, a, mode='valid')
# case: independent ar
#(a bit like recserar in gauss, but no x yet)
#(no, reserar is inverse filter)
result = np.zeros((x.shape[0]-nlags+1, nvar))
for i in range(nvar):
# could also use np.convolve, but easier for swiching to fft
result[:,i] = signal.convolve(x[:,i], a[:,i], mode='valid')
return result
elif a.ndim == 3:
# case: vector autoregressive with lag matrices
# Note: we must have shape[1] == shape[2] == nvar
yf = signal.convolve(x[:,:,None], a)
yvalid = yf[ntrim:-ntrim, yf.shape[1]//2,:]
return yvalid
def varinversefilter(ar, nobs, version=1):
'''creates inverse ar filter (MA representation) recursively
The VAR lag polynomial is defined by ::
ar(L) y_t = u_t or
y_t = -ar_{-1}(L) y_{t-1} + u_t
the returned lagpolynomial is arinv(L)=ar^{-1}(L) in ::
y_t = arinv(L) u_t
Parameters
----------
ar : ndarray, (nlags,nvars,nvars)
matrix lagpolynomial, currently no exog
first row should be identity
Returns
-------
arinv : ndarray, (nobs,nvars,nvars)
Notes
-----
'''
nlags, nvars, nvarsex = ar.shape
if nvars != nvarsex:
print('exogenous variables not implemented not tested')
arinv = np.zeros((nobs+1, nvarsex, nvars))
arinv[0,:,:] = ar[0]
arinv[1:nlags,:,:] = -ar[1:]
if version == 1:
for i in range(2,nobs+1):
tmp = np.zeros((nvars,nvars))
for p in range(1,nlags):
tmp += np.dot(-ar[p],arinv[i-p,:,:])
arinv[i,:,:] = tmp
if version == 0:
for i in range(nlags+1,nobs+1):
print(ar[1:].shape, arinv[i-1:i-nlags:-1,:,:].shape)
#arinv[i,:,:] = np.dot(-ar[1:],arinv[i-1:i-nlags:-1,:,:])
#print(np.tensordot(-ar[1:],arinv[i-1:i-nlags:-1,:,:],axes=([2],[1])).shape
#arinv[i,:,:] = np.tensordot(-ar[1:],arinv[i-1:i-nlags:-1,:,:],axes=([2],[1]))
raise NotImplementedError('waiting for generalized ufuncs or something')
return arinv
def vargenerate(ar, u, initvalues=None):
'''generate an VAR process with errors u
similar to gauss
uses loop
Parameters
----------
ar : array (nlags,nvars,nvars)
matrix lagpolynomial
u : array (nobs,nvars)
exogenous variable, error term for VAR
Returns
-------
sar : array (1+nobs,nvars)
sample of var process, inverse filtered u
does not trim initial condition y_0 = 0
Examples
--------
# generate random sample of VAR
nobs, nvars = 10, 2
u = numpy.random.randn(nobs,nvars)
a21 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.8, 0. ],
[ 0., -0.6]]])
vargenerate(a21,u)
# Impulse Response to an initial shock to the first variable
imp = np.zeros((nobs, nvars))
imp[0,0] = 1
vargenerate(a21,imp)
'''
nlags, nvars, nvarsex = ar.shape
nlagsm1 = nlags - 1
nobs = u.shape[0]
if nvars != nvarsex:
print('exogenous variables not implemented not tested')
if u.shape[1] != nvars:
raise ValueError('u needs to have nvars columns')
if initvalues is None:
sar = np.zeros((nobs+nlagsm1, nvars))
start = nlagsm1
else:
start = max(nlagsm1, initvalues.shape[0])
sar = np.zeros((nobs+start, nvars))
sar[start-initvalues.shape[0]:start] = initvalues
#sar[nlagsm1:] = u
sar[start:] = u
#if version == 1:
for i in range(start,start+nobs):
for p in range(1,nlags):
sar[i] += np.dot(sar[i-p,:],-ar[p])
return sar
def padone(x, front=0, back=0, axis=0, fillvalue=0):
'''pad with zeros along one axis, currently only axis=0
can be used sequentially to pad several axis
Examples
--------
>>> padone(np.ones((2,3)),1,3,axis=1)
array([[ 0., 1., 1., 1., 0., 0., 0.],
[ 0., 1., 1., 1., 0., 0., 0.]])
>>> padone(np.ones((2,3)),1,1, fillvalue=np.nan)
array([[ NaN, NaN, NaN],
[ 1., 1., 1.],
[ 1., 1., 1.],
[ NaN, NaN, NaN]])
'''
#primitive version
shape = np.array(x.shape)
shape[axis] += (front + back)
shapearr = np.array(x.shape)
out = np.empty(shape)
out.fill(fillvalue)
startind = np.zeros(x.ndim)
startind[axis] = front
endind = startind + shapearr
myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
#print(myslice
#print(out.shape
#print(out[tuple(myslice)].shape
out[tuple(myslice)] = x
return out
def trimone(x, front=0, back=0, axis=0):
'''trim number of array elements along one axis
Examples
--------
>>> xp = padone(np.ones((2,3)),1,3,axis=1)
>>> xp
array([[ 0., 1., 1., 1., 0., 0., 0.],
[ 0., 1., 1., 1., 0., 0., 0.]])
>>> trimone(xp,1,3,1)
array([[ 1., 1., 1.],
[ 1., 1., 1.]])
'''
shape = np.array(x.shape)
shape[axis] -= (front + back)
#print(shape, front, back
startind = np.zeros(x.ndim)
startind[axis] = front
endind = startind + shape
myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
#print(myslice
#print(shape, endind
#print(x[tuple(myslice)].shape
return x[tuple(myslice)]
def ar2full(ar):
'''make reduced lagpolynomial into a right side lagpoly array
'''
nlags, nvar,nvarex = ar.shape
return np.r_[np.eye(nvar,nvarex)[None,:,:],-ar]
def ar2lhs(ar):
'''convert full (rhs) lagpolynomial into a reduced, left side lagpoly array
this is mainly a reminder about the definition
'''
return -ar[1:]
class _Var:
'''obsolete VAR class, use tsa.VAR instead, for internal use only
Examples
--------
>>> v = Var(ar2s)
>>> v.fit(1)
>>> v.arhat
array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.77784898, 0.01726193],
[ 0.10733009, -0.78665335]]])
'''
def __init__(self, y):
self.y = y
self.nobs, self.nvars = y.shape
def fit(self, nlags):
'''estimate parameters using ols
Parameters
----------
nlags : int
number of lags to include in regression, same for all variables
Returns
-------
None, but attaches
arhat : array (nlags, nvar, nvar)
full lag polynomial array
arlhs : array (nlags-1, nvar, nvar)
reduced lag polynomial for left hand side
other statistics as returned by linalg.lstsq : need to be completed
This currently assumes all parameters are estimated without restrictions.
In this case SUR is identical to OLS
estimation results are attached to the class instance
'''
self.nlags = nlags # without current period
nvars = self.nvars
#TODO: ar2s looks like a module variable, bug?
#lmat = lagmat(ar2s, nlags, trim='both', original='in')
lmat = lagmat(self.y, nlags, trim='both', original='in')
self.yred = lmat[:,:nvars]
self.xred = lmat[:,nvars:]
res = np.linalg.lstsq(self.xred, self.yred, rcond=-1)
self.estresults = res
self.arlhs = res[0].reshape(nlags, nvars, nvars)
self.arhat = ar2full(self.arlhs)
self.rss = res[1]
self.xredrank = res[2]
def predict(self):
'''calculate estimated timeseries (yhat) for sample
'''
if not hasattr(self, 'yhat'):
self.yhat = varfilter(self.y, self.arhat)
return self.yhat
def covmat(self):
''' covariance matrix of estimate
# not sure it's correct, need to check orientation everywhere
# looks ok, display needs getting used to
>>> v.rss[None,None,:]*np.linalg.inv(np.dot(v.xred.T,v.xred))[:,:,None]
array([[[ 0.37247445, 0.32210609],
[ 0.1002642 , 0.08670584]],
[[ 0.1002642 , 0.08670584],
[ 0.45903637, 0.39696255]]])
>>>
>>> v.rss[0]*np.linalg.inv(np.dot(v.xred.T,v.xred))
array([[ 0.37247445, 0.1002642 ],
[ 0.1002642 , 0.45903637]])
>>> v.rss[1]*np.linalg.inv(np.dot(v.xred.T,v.xred))
array([[ 0.32210609, 0.08670584],
[ 0.08670584, 0.39696255]])
'''
#check if orientation is same as self.arhat
self.paramcov = (self.rss[None,None,:] *
np.linalg.inv(np.dot(self.xred.T, self.xred))[:,:,None])
def forecast(self, horiz=1, u=None):
'''calculates forcast for horiz number of periods at end of sample
Parameters
----------
horiz : int (optional, default=1)
forecast horizon
u : array (horiz, nvars)
error term for forecast periods. If None, then u is zero.
Returns
-------
yforecast : array (nobs+horiz, nvars)
this includes the sample and the forecasts
'''
if u is None:
u = np.zeros((horiz, self.nvars))
return vargenerate(self.arhat, u, initvalues=self.y)
[docs]
class VarmaPoly:
'''class to keep track of Varma polynomial format
Examples
--------
ar23 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.6, 0. ],
[ 0.2, -0.6]],
[[-0.1, 0. ],
[ 0.1, -0.1]]])
ma22 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[ 0.4, 0. ],
[ 0.2, 0.3]]])
'''
def __init__(self, ar, ma=None):
self.ar = ar
self.ma = ma
nlags, nvarall, nvars = ar.shape
self.nlags, self.nvarall, self.nvars = nlags, nvarall, nvars
self.isstructured = not (ar[0,:nvars] == np.eye(nvars)).all()
if self.ma is None:
self.ma = np.eye(nvars)[None,...]
self.isindependent = True
else:
self.isindependent = not (ma[0] == np.eye(nvars)).all()
self.malags = ar.shape[0]
self.hasexog = nvarall > nvars
self.arm1 = -ar[1:]
#@property
[docs]
def vstack(self, a=None, name='ar'):
'''stack lagpolynomial vertically in 2d array
'''
if a is not None:
a = a
elif name == 'ar':
a = self.ar
elif name == 'ma':
a = self.ma
else:
raise ValueError('no array or name given')
return a.reshape(-1, self.nvarall)
#@property
[docs]
def hstack(self, a=None, name='ar'):
'''stack lagpolynomial horizontally in 2d array
'''
if a is not None:
a = a
elif name == 'ar':
a = self.ar
elif name == 'ma':
a = self.ma
else:
raise ValueError('no array or name given')
return a.swapaxes(1,2).reshape(-1, self.nvarall).T
#@property
[docs]
def stacksquare(self, a=None, name='ar', orientation='vertical'):
'''stack lagpolynomial vertically in 2d square array with eye
'''
if a is not None:
a = a
elif name == 'ar':
a = self.ar
elif name == 'ma':
a = self.ma
else:
raise ValueError('no array or name given')
astacked = a.reshape(-1, self.nvarall)
lenpk, nvars = astacked.shape #[0]
amat = np.eye(lenpk, k=nvars)
amat[:,:nvars] = astacked
return amat
#@property
[docs]
def vstackarma_minus1(self):
'''stack ar and lagpolynomial vertically in 2d array
'''
a = np.concatenate((self.ar[1:], self.ma[1:]),0)
return a.reshape(-1, self.nvarall)
#@property
[docs]
def hstackarma_minus1(self):
'''stack ar and lagpolynomial vertically in 2d array
this is the Kalman Filter representation, I think
'''
a = np.concatenate((self.ar[1:], self.ma[1:]),0)
return a.swapaxes(1,2).reshape(-1, self.nvarall)
[docs]
def getisstationary(self, a=None):
'''check whether the auto-regressive lag-polynomial is stationary
Returns
-------
isstationary : bool
*attaches*
areigenvalues : complex array
eigenvalues sorted by absolute value
References
----------
formula taken from NAG manual
'''
if a is not None:
a = a
else:
if self.isstructured:
a = -self.reduceform(self.ar)[1:]
else:
a = -self.ar[1:]
amat = self.stacksquare(a)
ev = np.sort(np.linalg.eigvals(amat))[::-1]
self.areigenvalues = ev
return (np.abs(ev) < 1).all()
[docs]
def getisinvertible(self, a=None):
'''check whether the auto-regressive lag-polynomial is stationary
Returns
-------
isinvertible : bool
*attaches*
maeigenvalues : complex array
eigenvalues sorted by absolute value
References
----------
formula taken from NAG manual
'''
if a is not None:
a = a
else:
if self.isindependent:
a = self.reduceform(self.ma)[1:]
else:
a = self.ma[1:]
if a.shape[0] == 0:
# no ma lags
self.maeigenvalues = np.array([], np.complex)
return True
amat = self.stacksquare(a)
ev = np.sort(np.linalg.eigvals(amat))[::-1]
self.maeigenvalues = ev
return (np.abs(ev) < 1).all()
[docs]
def reduceform(self, apoly):
'''
this assumes no exog, todo
'''
if apoly.ndim != 3:
raise ValueError('apoly needs to be 3d')
nlags, nvarsex, nvars = apoly.shape
a = np.empty_like(apoly)
try:
a0inv = np.linalg.inv(a[0,:nvars, :])
except np.linalg.LinAlgError:
raise ValueError('matrix not invertible',
'ask for implementation of pinv')
for lag in range(nlags):
a[lag] = np.dot(a0inv, apoly[lag])
return a
if __name__ == "__main__":
# some example lag polynomials
a21 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.8, 0. ],
[ 0., -0.6]]])
a22 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.8, 0. ],
[ 0.1, -0.8]]])
a23 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.8, 0.2],
[ 0.1, -0.6]]])
a24 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.6, 0. ],
[ 0.2, -0.6]],
[[-0.1, 0. ],
[ 0.1, -0.1]]])
a31 = np.r_[np.eye(3)[None,:,:], 0.8*np.eye(3)[None,:,:]]
a32 = np.array([[[ 1. , 0. , 0. ],
[ 0. , 1. , 0. ],
[ 0. , 0. , 1. ]],
[[ 0.8, 0. , 0. ],
[ 0.1, 0.6, 0. ],
[ 0. , 0. , 0.9]]])
########
ut = np.random.randn(1000,2)
ar2s = vargenerate(a22,ut)
#res = np.linalg.lstsq(lagmat(ar2s,1)[:,1:], ar2s)
res = np.linalg.lstsq(lagmat(ar2s,1), ar2s, rcond=-1)
bhat = res[0].reshape(1,2,2)
arhat = ar2full(bhat)
#print(maxabs(arhat - a22)
v = _Var(ar2s)
v.fit(1)
v.forecast()
v.forecast(25)[-30:]
ar23 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-0.6, 0. ],
[ 0.2, -0.6]],
[[-0.1, 0. ],
[ 0.1, -0.1]]])
ma22 = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[ 0.4, 0. ],
[ 0.2, 0.3]]])
ar23ns = np.array([[[ 1. , 0. ],
[ 0. , 1. ]],
[[-1.9, 0. ],
[ 0.4, -0.6]],
[[ 0.3, 0. ],
[ 0.1, -0.1]]])
vp = VarmaPoly(ar23, ma22)
print(vars(vp))
print(vp.vstack())
print(vp.vstack(a24))
print(vp.hstackarma_minus1())
print(vp.getisstationary())
print(vp.getisinvertible())
vp2 = VarmaPoly(ar23ns)
print(vp2.getisstationary())
print(vp2.getisinvertible()) # no ma lags
Last update:
Jan 07, 2025