Source code for statsmodels.sandbox.tsa.fftarma
"""
Created on Mon Dec 14 19:53:25 2009
Author: josef-pktd
generate arma sample using fft with all the lfilter it looks slow
to get the ma representation first
apply arma filter (in ar representation) to time series to get white noise
but seems slow to be useful for fast estimation for nobs=10000
change/check: instead of using marep, use fft-transform of ar and ma
separately, use ratio check theory is correct and example works
DONE : feels much faster than lfilter
-> use for estimation of ARMA
-> use pade (scipy.interpolate) approximation to get starting polynomial
from autocorrelation (is autocorrelation of AR(p) related to marep?)
check if pade is fast, not for larger arrays ?
maybe pade does not do the right thing for this, not tried yet
scipy.pade([ 1. , 0.6, 0.25, 0.125, 0.0625, 0.1],2)
raises LinAlgError: singular matrix
also does not have roots inside unit circle ??
-> even without initialization, it might be fast for estimation
-> how do I enforce stationarity and invertibility,
need helper function
get function drop imag if close to zero from numpy/scipy source, where?
"""
import numpy as np
import numpy.fft as fft
#import scipy.fftpack as fft
from scipy import signal
#from try_var_convolve import maxabs
from statsmodels.tsa.arima_process import ArmaProcess
#trying to convert old experiments to a class
[docs]
class ArmaFft(ArmaProcess):
'''fft tools for arma processes
This class contains several methods that are providing the same or similar
returns to try out and test different implementations.
Notes
-----
TODO:
check whether we do not want to fix maxlags, and create new instance if
maxlag changes. usage for different lengths of timeseries ?
or fix frequency and length for fft
check default frequencies w, terminology norw n_or_w
some ffts are currently done without padding with zeros
returns for spectral density methods needs checking, is it always the power
spectrum hw*hw.conj()
normalization of the power spectrum, spectral density: not checked yet, for
example no variance of underlying process is used
'''
def __init__(self, ar, ma, n):
#duplicates now that are subclassing ArmaProcess
super().__init__(ar, ma)
self.ar = np.asarray(ar)
self.ma = np.asarray(ma)
self.nobs = n
#could make the polynomials into cached attributes
self.arpoly = np.polynomial.Polynomial(ar)
self.mapoly = np.polynomial.Polynomial(ma)
self.nar = len(ar) #1d only currently
self.nma = len(ma)
[docs]
def padarr(self, arr, maxlag, atend=True):
'''pad 1d array with zeros at end to have length maxlag
function that is a method, no self used
Parameters
----------
arr : array_like, 1d
array that will be padded with zeros
maxlag : int
length of array after padding
atend : bool
If True (default), then the zeros are added to the end, otherwise
to the front of the array
Returns
-------
arrp : ndarray
zero-padded array
Notes
-----
This is mainly written to extend coefficient arrays for the lag-polynomials.
It returns a copy.
'''
if atend:
return np.r_[arr, np.zeros(maxlag-len(arr))]
else:
return np.r_[np.zeros(maxlag-len(arr)), arr]
[docs]
def pad(self, maxlag):
'''construct AR and MA polynomials that are zero-padded to a common length
Parameters
----------
maxlag : int
new length of lag-polynomials
Returns
-------
ar : ndarray
extended AR polynomial coefficients
ma : ndarray
extended AR polynomial coefficients
'''
arpad = np.r_[self.ar, np.zeros(maxlag-self.nar)]
mapad = np.r_[self.ma, np.zeros(maxlag-self.nma)]
return arpad, mapad
[docs]
def fftar(self, n=None):
'''Fourier transform of AR polynomial, zero-padded at end to n
Parameters
----------
n : int
length of array after zero-padding
Returns
-------
fftar : ndarray
fft of zero-padded ar polynomial
'''
if n is None:
n = len(self.ar)
return fft.fft(self.padarr(self.ar, n))
[docs]
def fftma(self, n):
'''Fourier transform of MA polynomial, zero-padded at end to n
Parameters
----------
n : int
length of array after zero-padding
Returns
-------
fftar : ndarray
fft of zero-padded ar polynomial
'''
if n is None:
n = len(self.ar)
return fft.fft(self.padarr(self.ma, n))
[docs]
def fftarma(self, n=None):
'''Fourier transform of ARMA polynomial, zero-padded at end to n
The Fourier transform of the ARMA process is calculated as the ratio
of the fft of the MA polynomial divided by the fft of the AR polynomial.
Parameters
----------
n : int
length of array after zero-padding
Returns
-------
fftarma : ndarray
fft of zero-padded arma polynomial
'''
if n is None:
n = self.nobs
return (self.fftma(n) / self.fftar(n))
[docs]
def spd(self, npos):
'''raw spectral density, returns Fourier transform
n is number of points in positive spectrum, the actual number of points
is twice as large. different from other spd methods with fft
'''
n = npos
w = fft.fftfreq(2*n) * 2 * np.pi
hw = self.fftarma(2*n) #not sure, need to check normalization
#return (hw*hw.conj()).real[n//2-1:] * 0.5 / np.pi #does not show in plot
return (hw*hw.conj()).real * 0.5 / np.pi, w
[docs]
def spdshift(self, n):
'''power spectral density using fftshift
currently returns two-sided according to fft frequencies, use first half
'''
#size = s1+s2-1
mapadded = self.padarr(self.ma, n)
arpadded = self.padarr(self.ar, n)
hw = fft.fft(fft.fftshift(mapadded)) / fft.fft(fft.fftshift(arpadded))
#return np.abs(spd)[n//2-1:]
w = fft.fftfreq(n) * 2 * np.pi
wslice = slice(n//2-1, None, None)
#return (hw*hw.conj()).real[wslice], w[wslice]
return (hw*hw.conj()).real, w
[docs]
def spddirect(self, n):
'''power spectral density using padding to length n done by fft
currently returns two-sided according to fft frequencies, use first half
'''
#size = s1+s2-1
#abs looks wrong
hw = fft.fft(self.ma, n) / fft.fft(self.ar, n)
w = fft.fftfreq(n) * 2 * np.pi
wslice = slice(None, n//2, None)
#return (np.abs(hw)**2)[wslice], w[wslice]
return (np.abs(hw)**2) * 0.5/np.pi, w
def _spddirect2(self, n):
'''this looks bad, maybe with an fftshift
'''
#size = s1+s2-1
hw = (fft.fft(np.r_[self.ma[::-1],self.ma], n)
/ fft.fft(np.r_[self.ar[::-1],self.ar], n))
return (hw*hw.conj()) #.real[n//2-1:]
[docs]
def spdroots(self, w):
'''spectral density for frequency using polynomial roots
builds two arrays (number of roots, number of frequencies)
'''
return self._spdroots(self.arroots, self.maroots, w)
def _spdroots(self, arroots, maroots, w):
'''spectral density for frequency using polynomial roots
builds two arrays (number of roots, number of frequencies)
Parameters
----------
arroots : ndarray
roots of ar (denominator) lag-polynomial
maroots : ndarray
roots of ma (numerator) lag-polynomial
w : array_like
frequencies for which spd is calculated
Notes
-----
this should go into a function
'''
w = np.atleast_2d(w).T
cosw = np.cos(w)
#Greene 5th edt. p626, section 20.2.7.a.
maroots = 1./maroots
arroots = 1./arroots
num = 1 + maroots**2 - 2* maroots * cosw
den = 1 + arroots**2 - 2* arroots * cosw
#print 'num.shape, den.shape', num.shape, den.shape
hw = 0.5 / np.pi * num.prod(-1) / den.prod(-1) #or use expsumlog
return np.squeeze(hw), w.squeeze()
[docs]
def spdpoly(self, w, nma=50):
'''spectral density from MA polynomial representation for ARMA process
References
----------
Cochrane, section 8.3.3
'''
mpoly = np.polynomial.Polynomial(self.arma2ma(nma))
hw = mpoly(np.exp(1j * w))
spd = np.real_if_close(hw * hw.conj() * 0.5/np.pi)
return spd, w
[docs]
def filter(self, x):
'''
filter a timeseries with the ARMA filter
padding with zero is missing, in example I needed the padding to get
initial conditions identical to direct filter
Initial filtered observations differ from filter2 and signal.lfilter, but
at end they are the same.
See Also
--------
tsa.filters.fftconvolve
'''
n = x.shape[0]
if n == self.fftarma:
fftarma = self.fftarma
else:
fftarma = self.fftma(n) / self.fftar(n)
tmpfft = fftarma * fft.fft(x)
return fft.ifft(tmpfft)
[docs]
def filter2(self, x, pad=0):
'''filter a time series using fftconvolve3 with ARMA filter
padding of x currently works only if x is 1d
in example it produces same observations at beginning as lfilter even
without padding.
TODO: this returns 1 additional observation at the end
'''
from statsmodels.tsa.filters import fftconvolve3
if not pad:
pass
elif pad == 'auto':
#just guessing how much padding
x = self.padarr(x, x.shape[0] + 2*(self.nma+self.nar), atend=False)
else:
x = self.padarr(x, x.shape[0] + int(pad), atend=False)
return fftconvolve3(x, self.ma, self.ar)
[docs]
def acf2spdfreq(self, acovf, nfreq=100, w=None):
'''
not really a method
just for comparison, not efficient for large n or long acf
this is also similarly use in tsa.stattools.periodogram with window
'''
if w is None:
w = np.linspace(0, np.pi, nfreq)[:, None]
nac = len(acovf)
hw = 0.5 / np.pi * (acovf[0] +
2 * (acovf[1:] * np.cos(w*np.arange(1,nac))).sum(1))
return hw
[docs]
def invpowerspd(self, n):
'''autocovariance from spectral density
scaling is correct, but n needs to be large for numerical accuracy
maybe padding with zero in fft would be faster
without slicing it returns 2-sided autocovariance with fftshift
>>> ArmaFft([1, -0.5], [1., 0.4], 40).invpowerspd(2**8)[:10]
array([ 2.08 , 1.44 , 0.72 , 0.36 , 0.18 , 0.09 ,
0.045 , 0.0225 , 0.01125 , 0.005625])
>>> ArmaFft([1, -0.5], [1., 0.4], 40).acovf(10)
array([ 2.08 , 1.44 , 0.72 , 0.36 , 0.18 , 0.09 ,
0.045 , 0.0225 , 0.01125 , 0.005625])
'''
hw = self.fftarma(n)
return np.real_if_close(fft.ifft(hw*hw.conj()), tol=200)[:n]
[docs]
def spdmapoly(self, w, twosided=False):
'''ma only, need division for ar, use LagPolynomial
'''
if w is None:
w = np.linspace(0, np.pi, nfreq)
return 0.5 / np.pi * self.mapoly(np.exp(w*1j))
[docs]
def plot4(self, fig=None, nobs=100, nacf=20, nfreq=100):
"""Plot results"""
rvs = self.generate_sample(nsample=100, burnin=500)
acf = self.acf(nacf)[:nacf] #TODO: check return length
pacf = self.pacf(nacf)
w = np.linspace(0, np.pi, nfreq)
spdr, wr = self.spdroots(w)
if fig is None:
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(2,2,1)
ax.plot(rvs)
ax.set_title(f'Random Sample \nar={self.ar}, ma={self.ma}')
ax = fig.add_subplot(2,2,2)
ax.plot(acf)
ax.set_title(f'Autocorrelation \nar={self.ar}, ma={self.ma!r}s')
ax = fig.add_subplot(2,2,3)
ax.plot(wr, spdr)
ax.set_title(f'Power Spectrum \nar={self.ar}, ma={self.ma}')
ax = fig.add_subplot(2,2,4)
ax.plot(pacf)
ax.set_title(f'Partial Autocorrelation \nar={self.ar}, ma={self.ma}')
return fig
def spdar1(ar, w):
if np.ndim(ar) == 0:
rho = ar
else:
rho = -ar[1]
return 0.5 / np.pi /(1 + rho*rho - 2 * rho * np.cos(w))
if __name__ == '__main__':
def maxabs(x,y):
return np.max(np.abs(x-y))
nobs = 200 #10000
ar = [1, 0.0]
ma = [1, 0.0]
ar2 = np.zeros(nobs)
ar2[:2] = [1, -0.9]
uni = np.zeros(nobs)
uni[0]=1.
#arrep = signal.lfilter(ma, ar, ar2)
#marep = signal.lfilter([1],arrep, uni)
# same faster:
arcomb = np.convolve(ar, ar2, mode='same')
marep = signal.lfilter(ma,arcomb, uni) #[len(ma):]
print(marep[:10])
mafr = fft.fft(marep)
rvs = np.random.normal(size=nobs)
datafr = fft.fft(rvs)
y = fft.ifft(mafr*datafr)
print(np.corrcoef(np.c_[y[2:], y[1:-1], y[:-2]],rowvar=0))
arrep = signal.lfilter([1],marep, uni)
print(arrep[:20]) # roundtrip to ar
arfr = fft.fft(arrep)
yfr = fft.fft(y)
x = fft.ifft(arfr*yfr).real #imag part is e-15
# the next two are equal, roundtrip works
print(x[:5])
print(rvs[:5])
print(np.corrcoef(np.c_[x[2:], x[1:-1], x[:-2]],rowvar=0))
# ARMA filter using fft with ratio of fft of ma/ar lag polynomial
# seems much faster than using lfilter
#padding, note arcomb is already full length
arcombp = np.zeros(nobs)
arcombp[:len(arcomb)] = arcomb
map_ = np.zeros(nobs) #rename: map was shadowing builtin
map_[:len(ma)] = ma
ar0fr = fft.fft(arcombp)
ma0fr = fft.fft(map_)
y2 = fft.ifft(ma0fr/ar0fr*datafr)
#the next two are (almost) equal in real part, almost zero but different in imag
print(y2[:10])
print(y[:10])
print(maxabs(y, y2)) # from chfdiscrete
#1.1282071239631782e-014
ar = [1, -0.4]
ma = [1, 0.2]
arma1 = ArmaFft([1, -0.5,0,0,0,00, -0.7, 0.3], [1, 0.8], nobs)
nfreq = nobs
w = np.linspace(0, np.pi, nfreq)
w2 = np.linspace(0, 2*np.pi, nfreq)
import matplotlib.pyplot as plt
plt.close('all')
plt.figure()
spd1, w1 = arma1.spd(2**10)
print(spd1.shape)
_ = plt.plot(spd1)
plt.title('spd fft complex')
plt.figure()
spd2, w2 = arma1.spdshift(2**10)
print(spd2.shape)
_ = plt.plot(w2, spd2)
plt.title('spd fft shift')
plt.figure()
spd3, w3 = arma1.spddirect(2**10)
print(spd3.shape)
_ = plt.plot(w3, spd3)
plt.title('spd fft direct')
plt.figure()
spd3b = arma1._spddirect2(2**10)
print(spd3b.shape)
_ = plt.plot(spd3b)
plt.title('spd fft direct mirrored')
plt.figure()
spdr, wr = arma1.spdroots(w)
print(spdr.shape)
plt.plot(w, spdr)
plt.title('spd from roots')
plt.figure()
spdar1_ = spdar1(arma1.ar, w)
print(spdar1_.shape)
_ = plt.plot(w, spdar1_)
plt.title('spd ar1')
plt.figure()
wper, spdper = arma1.periodogram(nfreq)
print(spdper.shape)
_ = plt.plot(w, spdper)
plt.title('periodogram')
startup = 1000
rvs = arma1.generate_sample(startup+10000)[startup:]
import matplotlib.mlab as mlb
plt.figure()
sdm, wm = mlb.psd(x)
print('sdm.shape', sdm.shape)
sdm = sdm.ravel()
plt.plot(wm, sdm)
plt.title('matplotlib')
from nitime.algorithms import LD_AR_est
#yule_AR_est(s, order, Nfreqs)
wnt, spdnt = LD_AR_est(rvs, 10, 512)
plt.figure()
print('spdnt.shape', spdnt.shape)
_ = plt.plot(spdnt.ravel())
print(spdnt[:10])
plt.title('nitime')
fig = plt.figure()
arma1.plot4(fig)
#plt.show()
Last update:
Nov 14, 2024