# Source code for statsmodels.tsa.varma_process

```# -*- coding: utf-8 -*-
""" Helper and filter functions for VAR and VARMA, and basic VAR class

Created on Mon Jan 11 11:04:23 2010
Author: josef-pktd

This is a new version, I didn't look at the old version again, but similar
ideas.

not copied/cleaned yet:
* fftn based filtering, creating samples with fft
* Tests: I ran examples but did not convert them to tests
examples look good for parameter estimate and forecast, and filter functions

main TODOs:
* result statistics
* see whether Bayesian dummy observation can be included without changing
the single call to linalg.lstsq
* impulse response function does not treat correlation, see Hamilton and jplv

Extensions
* constraints, Bayesian priors/penalization
* Error Correction Form and Cointegration
* Factor Models Stock-Watson,  ???

"""
from __future__ import print_function
import numpy as np
from numpy.testing import assert_equal
from scipy import signal
#might not (yet) need the following
from scipy.signal.signaltools import _centered as trim_centered

from statsmodels.tsa.tsatools import lagmat

def varfilter(x, a):
'''apply an autoregressive filter to a series x

Warning: I just found out that convolve doesn't work as I
thought, this likely doesn't work correctly for
nvars>3

x can be 2d, a can be 1d, 2d, or 3d

Parameters
----------
x : array_like
data array, 1d or 2d, if 2d then observations in rows
a : array_like
autoregressive filter coefficients, ar lag polynomial
see Notes

Returns
-------
y : ndarray, 2d
filtered array, number of columns determined by x and a

Notes
-----

In general form this uses the linear filter ::

y = a(L)x

where
x : nobs, nvars
a : nlags, nvars, npoly

Depending on the shape and dimension of a this uses different
Lag polynomial arrays

case 1 : a is 1d or (nlags,1)
one lag polynomial is applied to all variables (columns of x)
case 2 : a is 2d, (nlags, nvars)
each series is independently filtered with its own
lag polynomial, uses loop over nvar
case 3 : a is 3d, (nlags, nvars, npoly)
the ith column of the output array is given by the linear filter
defined by the 2d array a[:,:,i], i.e. ::

y[:,i] = a(.,.,i)(L) * x
y[t,i] = sum_p sum_j a(p,j,i)*x(t-p,j)
for p = 0,...nlags-1, j = 0,...nvars-1,
for all t >= nlags

Note: maybe convert to axis=1, Not

TODO: initial conditions

'''
x = np.asarray(x)
a = np.asarray(a)
if x.ndim == 1:
x = x[:,None]
if x.ndim > 2:
raise ValueError('x array has to be 1d or 2d')
nvar = x.shape[1]
nlags = a.shape[0]
ntrim = nlags//2
# for x is 2d with ncols >1

if a.ndim == 1:
# case: identical ar filter (lag polynomial)
return signal.convolve(x, a[:,None], mode='valid')
# alternative:
#return signal.lfilter(a,[1],x.astype(float),axis=0)
elif a.ndim == 2:
if min(a.shape) == 1:
# case: identical ar filter (lag polynomial)
return signal.convolve(x, a, mode='valid')

# case: independent ar
#(a bit like recserar in gauss, but no x yet)
#(no, reserar is inverse filter)
result = np.zeros((x.shape[0]-nlags+1, nvar))
for i in range(nvar):
# could also use np.convolve, but easier for swiching to fft
result[:,i] = signal.convolve(x[:,i], a[:,i], mode='valid')
return result

elif a.ndim == 3:
# case: vector autoregressive with lag matrices
#        #not necessary:
#        if np.any(a.shape[1:] != nvar):
#            raise ValueError('if 3d shape of a has to be (nobs,nvar,nvar)')
yf = signal.convolve(x[:,:,None], a)
yvalid = yf[ntrim:-ntrim, yf.shape[1]//2,:]
return yvalid

def varinversefilter(ar, nobs, version=1):
'''creates inverse ar filter (MA representation) recursively

The VAR lag polynomial is defined by ::

ar(L) y_t = u_t  or
y_t = -ar_{-1}(L) y_{t-1} + u_t

the returned lagpolynomial is arinv(L)=ar^{-1}(L) in ::

y_t = arinv(L) u_t

Parameters
----------
ar : array, (nlags,nvars,nvars)
matrix lagpolynomial, currently no exog
first row should be identity

Returns
-------
arinv : array, (nobs,nvars,nvars)

Notes
-----

'''
nlags, nvars, nvarsex = ar.shape
if nvars != nvarsex:
print('exogenous variables not implemented not tested')
arinv = np.zeros((nobs+1, nvarsex, nvars))
arinv[0,:,:] = ar[0]
arinv[1:nlags,:,:] = -ar[1:]
if version == 1:
for i in range(2,nobs+1):
tmp = np.zeros((nvars,nvars))
for p in range(1,nlags):
tmp += np.dot(-ar[p],arinv[i-p,:,:])
arinv[i,:,:] = tmp
if version == 0:
for i in range(nlags+1,nobs+1):
print(ar[1:].shape, arinv[i-1:i-nlags:-1,:,:].shape)
#arinv[i,:,:] = np.dot(-ar[1:],arinv[i-1:i-nlags:-1,:,:])
#print(np.tensordot(-ar[1:],arinv[i-1:i-nlags:-1,:,:],axes=([2],[1])).shape
#arinv[i,:,:] = np.tensordot(-ar[1:],arinv[i-1:i-nlags:-1,:,:],axes=([2],[1]))
raise NotImplementedError('waiting for generalized ufuncs or something')

return arinv

def vargenerate(ar, u, initvalues=None):
'''generate an VAR process with errors u

similar to gauss
uses loop

Parameters
----------
ar : array (nlags,nvars,nvars)
matrix lagpolynomial
u : array (nobs,nvars)
exogenous variable, error term for VAR

Returns
-------
sar : array (1+nobs,nvars)
sample of var process, inverse filtered u
does not trim initial condition y_0 = 0

Examples
--------
# generate random sample of VAR
nobs, nvars = 10, 2
u = numpy.random.randn(nobs,nvars)
a21 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[-0.8,  0. ],
[ 0.,  -0.6]]])
vargenerate(a21,u)

# Impulse Response to an initial shock to the first variable
imp = np.zeros((nobs, nvars))
imp[0,0] = 1
vargenerate(a21,imp)

'''
nlags, nvars, nvarsex = ar.shape
nlagsm1 = nlags - 1
nobs = u.shape[0]
if nvars != nvarsex:
print('exogenous variables not implemented not tested')
if u.shape[1] != nvars:
raise ValueError('u needs to have nvars columns')
if initvalues is None:
sar = np.zeros((nobs+nlagsm1, nvars))
start = nlagsm1
else:
start = max(nlagsm1, initvalues.shape[0])
sar = np.zeros((nobs+start, nvars))
sar[start-initvalues.shape[0]:start] = initvalues
#sar[nlagsm1:] = u
sar[start:] = u
#if version == 1:
for i in range(start,start+nobs):
for p in range(1,nlags):
sar[i] += np.dot(sar[i-p,:],-ar[p])

return sar

def padone(x, front=0, back=0, axis=0, fillvalue=0):
'''pad with zeros along one axis, currently only axis=0

can be used sequentially to pad several axis

Examples
--------
array([[ 0.,  1.,  1.,  1.,  0.,  0.,  0.],
[ 0.,  1.,  1.,  1.,  0.,  0.,  0.]])

array([[ NaN,  NaN,  NaN],
[  1.,   1.,   1.],
[  1.,   1.,   1.],
[ NaN,  NaN,  NaN]])
'''
#primitive version
shape = np.array(x.shape)
shape[axis] += (front + back)
shapearr = np.array(x.shape)
out = np.empty(shape)
out.fill(fillvalue)
startind = np.zeros(x.ndim)
startind[axis] = front
endind = startind + shapearr
myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
#print(myslice
#print(out.shape
#print(out[tuple(myslice)].shape
out[tuple(myslice)] = x
return out

def trimone(x, front=0, back=0, axis=0):
'''trim number of array elements along one axis

Examples
--------
>>> xp
array([[ 0.,  1.,  1.,  1.,  0.,  0.,  0.],
[ 0.,  1.,  1.,  1.,  0.,  0.,  0.]])
>>> trimone(xp,1,3,1)
array([[ 1.,  1.,  1.],
[ 1.,  1.,  1.]])
'''
shape = np.array(x.shape)
shape[axis] -= (front + back)
#print(shape, front, back
shapearr = np.array(x.shape)
startind = np.zeros(x.ndim)
startind[axis] = front
endind = startind + shape
myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
#print(myslice
#print(shape, endind
#print(x[tuple(myslice)].shape
return x[tuple(myslice)]

def ar2full(ar):
'''make reduced lagpolynomial into a right side lagpoly array
'''
nlags, nvar,nvarex = ar.shape
return np.r_[np.eye(nvar,nvarex)[None,:,:],-ar]

def ar2lhs(ar):
'''convert full (rhs) lagpolynomial into a reduced, left side lagpoly array

this is mainly a reminder about the definition
'''
return -ar[1:]

class _Var(object):
'''obsolete VAR class, use tsa.VAR instead, for internal use only

Examples
--------

>>> v = Var(ar2s)
>>> v.fit(1)
>>> v.arhat
array([[[ 1.        ,  0.        ],
[ 0.        ,  1.        ]],

[[-0.77784898,  0.01726193],
[ 0.10733009, -0.78665335]]])

'''

def __init__(self, y):
self.y = y
self.nobs, self.nvars = y.shape

def fit(self, nlags):
'''estimate parameters using ols

Parameters
----------
nlags : integer
number of lags to include in regression, same for all variables

Returns
-------
None, but attaches

arhat : array (nlags, nvar, nvar)
full lag polynomial array
arlhs : array (nlags-1, nvar, nvar)
reduced lag polynomial for left hand side
other statistics as returned by linalg.lstsq : need to be completed

This currently assumes all parameters are estimated without restrictions.
In this case SUR is identical to OLS

estimation results are attached to the class instance

'''
self.nlags = nlags # without current period
nvars = self.nvars
#TODO: ar2s looks like a module variable, bug?
#lmat = lagmat(ar2s, nlags, trim='both', original='in')
lmat = lagmat(self.y, nlags, trim='both', original='in')
self.yred = lmat[:,:nvars]
self.xred = lmat[:,nvars:]
res = np.linalg.lstsq(self.xred, self.yred)
self.estresults = res
self.arlhs = res[0].reshape(nlags, nvars, nvars)
self.arhat = ar2full(self.arlhs)
self.xredrank = res[2]

def predict(self):
'''calculate estimated timeseries (yhat) for sample

'''

if not hasattr(self, 'yhat'):
self.yhat = varfilter(self.y, self.arhat)
return self.yhat

def covmat(self):
''' covariance matrix of estimate
# not sure it's correct, need to check orientation everywhere
# looks ok, display needs getting used to
array([[[ 0.37247445,  0.32210609],
[ 0.1002642 ,  0.08670584]],

[[ 0.1002642 ,  0.08670584],
[ 0.45903637,  0.39696255]]])
>>>
array([[ 0.37247445,  0.1002642 ],
[ 0.1002642 ,  0.45903637]])
array([[ 0.32210609,  0.08670584],
[ 0.08670584,  0.39696255]])
'''

#check if orientation is same as self.arhat
np.linalg.inv(np.dot(self.xred.T, self.xred))[:,:,None])

def forecast(self, horiz=1, u=None):
'''calculates forcast for horiz number of periods at end of sample

Parameters
----------
horiz : int (optional, default=1)
forecast horizon
u : array (horiz, nvars)
error term for forecast periods. If None, then u is zero.

Returns
-------
yforecast : array (nobs+horiz, nvars)
this includes the sample and the forecasts
'''
if u is None:
u = np.zeros((horiz, self.nvars))
return vargenerate(self.arhat, u, initvalues=self.y)

[docs]class VarmaPoly(object):
'''class to keep track of Varma polynomial format

Examples
--------

ar23 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[-0.6,  0. ],
[ 0.2, -0.6]],

[[-0.1,  0. ],
[ 0.1, -0.1]]])

ma22 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[ 0.4,  0. ],
[ 0.2, 0.3]]])

'''
def __init__(self, ar, ma=None):
self.ar = ar
self.ma = ma
nlags, nvarall, nvars = ar.shape
self.nlags, self.nvarall, self.nvars = nlags, nvarall, nvars
self.isstructured = not (ar[0,:nvars] == np.eye(nvars)).all()
if self.ma is None:
self.ma = np.eye(nvars)[None,...]
self.isindependent = True
else:
self.isindependent = not (ma[0] == np.eye(nvars)).all()
self.malags = ar.shape[0]
self.hasexog = nvarall > nvars
self.arm1 = -ar[1:]

#@property
[docs]    def vstack(self, a=None, name='ar'):
'''stack lagpolynomial vertically in 2d array

'''
if not a is None:
a = a
elif name == 'ar':
a = self.ar
elif name == 'ma':
a = self.ma
else:
raise ValueError('no array or name given')
return a.reshape(-1, self.nvarall)

#@property
[docs]    def hstack(self, a=None, name='ar'):
'''stack lagpolynomial horizontally in 2d array

'''
if not a is None:
a = a
elif name == 'ar':
a = self.ar
elif name == 'ma':
a = self.ma
else:
raise ValueError('no array or name given')
return a.swapaxes(1,2).reshape(-1, self.nvarall).T

#@property
[docs]    def stacksquare(self, a=None, name='ar', orientation='vertical'):
'''stack lagpolynomial vertically in 2d square array with eye

'''
if not a is None:
a = a
elif name == 'ar':
a = self.ar
elif name == 'ma':
a = self.ma
else:
raise ValueError('no array or name given')
astacked = a.reshape(-1, self.nvarall)
lenpk, nvars = astacked.shape #[0]
amat = np.eye(lenpk, k=nvars)
amat[:,:nvars] = astacked
return amat

#@property
[docs]    def vstackarma_minus1(self):
'''stack ar and lagpolynomial vertically in 2d array

'''
a = np.concatenate((self.ar[1:], self.ma[1:]),0)
return a.reshape(-1, self.nvarall)

#@property
[docs]    def hstackarma_minus1(self):
'''stack ar and lagpolynomial vertically in 2d array

this is the Kalman Filter representation, I think
'''
a = np.concatenate((self.ar[1:], self.ma[1:]),0)
return a.swapaxes(1,2).reshape(-1, self.nvarall)

[docs]    def getisstationary(self, a=None):
'''check whether the auto-regressive lag-polynomial is stationary

Returns
-------
isstationary : boolean

*attaches*

areigenvalues : complex array
eigenvalues sorted by absolute value

References
----------
formula taken from NAG manual

'''
if not a is None:
a = a
else:
if self.isstructured:
a = -self.reduceform(self.ar)[1:]
else:
a = -self.ar[1:]
amat = self.stacksquare(a)
ev = np.sort(np.linalg.eigvals(amat))[::-1]
self.areigenvalues = ev
return (np.abs(ev) < 1).all()

[docs]    def getisinvertible(self, a=None):
'''check whether the auto-regressive lag-polynomial is stationary

Returns
-------
isinvertible : boolean

*attaches*

maeigenvalues : complex array
eigenvalues sorted by absolute value

References
----------
formula taken from NAG manual

'''
if not a is None:
a = a
else:
if self.isindependent:
a = self.reduceform(self.ma)[1:]
else:
a = self.ma[1:]
if a.shape[0] == 0:
# no ma lags
self.maeigenvalues = np.array([], np.complex)
return True

amat = self.stacksquare(a)
ev = np.sort(np.linalg.eigvals(amat))[::-1]
self.maeigenvalues = ev
return (np.abs(ev) < 1).all()

[docs]    def reduceform(self, apoly):
'''

this assumes no exog, todo

'''
if apoly.ndim != 3:
raise ValueError('apoly needs to be 3d')
nlags, nvarsex, nvars = apoly.shape

a = np.empty_like(apoly)
try:
a0inv = np.linalg.inv(a[0,:nvars, :])
except np.linalg.LinAlgError:
raise ValueError('matrix not invertible',

for lag in range(nlags):
a[lag] = np.dot(a0inv, apoly[lag])

return a

if __name__ == "__main__":
# some example lag polynomials
a21 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[-0.8,  0. ],
[ 0.,  -0.6]]])

a22 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[-0.8,  0. ],
[ 0.1, -0.8]]])

a23 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[-0.8,  0.2],
[ 0.1, -0.6]]])

a24 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[-0.6,  0. ],
[ 0.2, -0.6]],

[[-0.1,  0. ],
[ 0.1, -0.1]]])

a31 = np.r_[np.eye(3)[None,:,:], 0.8*np.eye(3)[None,:,:]]
a32 = np.array([[[ 1. ,  0. ,  0. ],
[ 0. ,  1. ,  0. ],
[ 0. ,  0. ,  1. ]],

[[ 0.8,  0. ,  0. ],
[ 0.1,  0.6,  0. ],
[ 0. ,  0. ,  0.9]]])

########
ut = np.random.randn(1000,2)
ar2s = vargenerate(a22,ut)
#res = np.linalg.lstsq(lagmat(ar2s,1)[:,1:], ar2s)
res = np.linalg.lstsq(lagmat(ar2s,1), ar2s)
bhat = res[0].reshape(1,2,2)
arhat = ar2full(bhat)
#print(maxabs(arhat - a22)

v = _Var(ar2s)
v.fit(1)
v.forecast()
v.forecast(25)[-30:]

ar23 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[-0.6,  0. ],
[ 0.2, -0.6]],

[[-0.1,  0. ],
[ 0.1, -0.1]]])

ma22 = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[ 0.4,  0. ],
[ 0.2, 0.3]]])

ar23ns = np.array([[[ 1. ,  0. ],
[ 0. ,  1. ]],

[[-1.9,  0. ],
[ 0.4, -0.6]],

[[ 0.3,  0. ],
[ 0.1, -0.1]]])

vp = VarmaPoly(ar23, ma22)
print(vars(vp))
print(vp.vstack())
print(vp.vstack(a24))
print(vp.hstackarma_minus1())
print(vp.getisstationary())
print(vp.getisinvertible())

vp2 = VarmaPoly(ar23ns)
print(vp2.getisstationary())
print(vp2.getisinvertible()) # no ma lags
```