Source code for statsmodels.tsa.vector_ar.svar_model

"""
Vector Autoregression (VAR) processes

References
----------
Lütkepohl (2005) New Introduction to Multiple Time Series Analysis
"""
import numpy as np
import numpy.linalg as npl
from numpy.linalg import slogdet

from statsmodels.tools.decorators import deprecated_alias
from statsmodels.tools.numdiff import approx_fprime, approx_hess
import statsmodels.tsa.base.tsa_model as tsbase
from statsmodels.tsa.vector_ar import util
from statsmodels.tsa.vector_ar.irf import IRAnalysis
from statsmodels.tsa.vector_ar.var_model import VARProcess, VARResults


def svar_ckerr(svar_type, A, B):
    if A is None and (svar_type == "A" or svar_type == "AB"):
        raise ValueError("SVAR of type A or AB but A array not given.")
    if B is None and (svar_type == "B" or svar_type == "AB"):

        raise ValueError("SVAR of type B or AB but B array not given.")


[docs] class SVAR(tsbase.TimeSeriesModel): r""" Fit VAR and then estimate structural components of A and B, defined: .. math:: Ay_t = A_1 y_{t-1} + \ldots + A_p y_{t-p} + B \varepsilon_t Parameters ---------- endog : array_like 1-d endogenous response variable. The independent variable. dates : array_like must match number of rows of endog svar_type : str "A" - estimate structural parameters of A matrix, B assumed = I "B" - estimate structural parameters of B matrix, A assumed = I "AB" - estimate structural parameters indicated in both A and B matrix A : array_like neqs x neqs with unknown parameters marked with 'E' for estimate B : array_like neqs x neqs with unknown parameters marked with 'E' for estimate References ---------- Hamilton (1994) Time Series Analysis """ y = deprecated_alias("y", "endog", remove_version="0.11.0") def __init__(self, endog, svar_type, dates=None, freq=None, A=None, B=None, missing="none"): super().__init__(endog, None, dates, freq, missing=missing) # (self.endog, self.names, # self.dates) = data_util.interpret_data(endog, names, dates) self.neqs = self.endog.shape[1] types = ["A", "B", "AB"] if svar_type not in types: raise ValueError("SVAR type not recognized, must be in " + str(types)) self.svar_type = svar_type svar_ckerr(svar_type, A, B) self.A_original = A self.B_original = B # initialize A, B as I if not given # Initialize SVAR masks if A is None: A = np.identity(self.neqs) self.A_mask = A_mask = np.zeros(A.shape, dtype=bool) else: A = A.astype("U") A_mask = np.logical_or(A == "E", A == "e") self.A_mask = A_mask if B is None: B = np.identity(self.neqs) self.B_mask = B_mask = np.zeros(B.shape, dtype=bool) else: B = B.astype("U") B_mask = np.logical_or(B == "E", B == "e") self.B_mask = B_mask # convert A and B to numeric # TODO: change this when masked support is better or with formula # integration Anum = np.zeros(A.shape, dtype=float) Anum[~A_mask] = A[~A_mask] Anum[A_mask] = np.nan self.A = Anum Bnum = np.zeros(B.shape, dtype=float) Bnum[~B_mask] = B[~B_mask] Bnum[B_mask] = np.nan self.B = Bnum # LikelihoodModel.__init__(self, endog) # super().__init__(endog)
[docs] def fit(self, A_guess=None, B_guess=None, maxlags=None, method="ols", ic=None, trend="c", verbose=False, s_method="mle", solver="bfgs", override=False, maxiter=500, maxfun=500): """ Fit the SVAR model and solve for structural parameters Parameters ---------- A_guess : array_like, optional A vector of starting values for all parameters to be estimated in A. B_guess : array_like, optional A vector of starting values for all parameters to be estimated in B. maxlags : int Maximum number of lags to check for order selection, defaults to 12 * (nobs/100.)**(1./4), see select_order function method : {'ols'} Estimation method to use ic : {'aic', 'fpe', 'hqic', 'bic', None} Information criterion to use for VAR order selection. aic : Akaike fpe : Final prediction error hqic : Hannan-Quinn bic : Bayesian a.k.a. Schwarz verbose : bool, default False Print order selection output to the screen trend, str {"c", "ct", "ctt", "n"} "c" - add constant "ct" - constant and trend "ctt" - constant, linear and quadratic trend "n" - co constant, no trend Note that these are prepended to the columns of the dataset. s_method : {'mle'} Estimation method for structural parameters solver : {'nm', 'newton', 'bfgs', 'cg', 'ncg', 'powell'} Solution method See statsmodels.base for details override : bool, default False If True, returns estimates of A and B without checking order or rank condition maxiter : int, default 500 Number of iterations to perform in solution method maxfun : int Number of function evaluations to perform Notes ----- Lütkepohl pp. 146-153 Hamilton pp. 324-336 Returns ------- est : SVARResults """ lags = maxlags if ic is not None: selections = self.select_order(maxlags=maxlags, verbose=verbose) if ic not in selections: raise ValueError("%s not recognized, must be among %s" % (ic, sorted(selections))) lags = selections[ic] if verbose: print("Using %d based on %s criterion" % (lags, ic)) elif lags is None: lags = 1 self.nobs = len(self.endog) - lags # initialize starting parameters start_params = self._get_init_params(A_guess, B_guess) return self._estimate_svar(start_params, lags, trend=trend, solver=solver, override=override, maxiter=maxiter, maxfun=maxfun)
def _get_init_params(self, A_guess, B_guess): """ Returns either the given starting or .1 if none are given. """ var_type = self.svar_type.lower() n_masked_a = self.A_mask.sum() if var_type in ["ab", "a"]: if A_guess is None: A_guess = np.array([.1]*n_masked_a) elif len(A_guess) != n_masked_a: msg = "len(A_guess) = %s, there are %s parameters in A" raise ValueError(msg % (len(A_guess), n_masked_a)) else: A_guess = [] n_masked_b = self.B_mask.sum() if var_type in ["ab", "b"]: if B_guess is None: B_guess = np.array([.1]*n_masked_b) elif len(B_guess) != n_masked_b: msg = "len(B_guess) = %s, there are %s parameters in B" raise ValueError(msg % (len(B_guess), n_masked_b)) else: B_guess = [] return np.r_[A_guess, B_guess] def _estimate_svar(self, start_params, lags, maxiter, maxfun, trend="c", solver="nm", override=False): """ lags : int trend : {str, None} As per above """ k_trend = util.get_trendorder(trend) y = self.endog z = util.get_var_endog(y, lags, trend=trend, has_constant="raise") y_sample = y[lags:] # Lutkepohl p75, about 5x faster than stated formula var_params = np.linalg.lstsq(z, y_sample, rcond=-1)[0] resid = y_sample - np.dot(z, var_params) # Unbiased estimate of covariance matrix $\Sigma_u$ of the white noise # process $u$ # equivalent definition # .. math:: \frac{1}{T - Kp - 1} Y^\prime (I_T - Z (Z^\prime Z)^{-1} # Z^\prime) Y # Ref: Lutkepohl p.75 # df_resid right now is T - Kp - 1, which is a suggested correction avobs = len(y_sample) df_resid = avobs - (self.neqs * lags + k_trend) sse = np.dot(resid.T, resid) # TODO: should give users the option to use a dof correction or not omega = sse / df_resid self.sigma_u = omega A, B = self._solve_AB(start_params, override=override, solver=solver, maxiter=maxiter) A_mask = self.A_mask B_mask = self.B_mask return SVARResults(y, z, var_params, omega, lags, names=self.endog_names, trend=trend, dates=self.data.dates, model=self, A=A, B=B, A_mask=A_mask, B_mask=B_mask)
[docs] def loglike(self, params): """ Loglikelihood for SVAR model Notes ----- This method assumes that the autoregressive parameters are first estimated, then likelihood with structural parameters is estimated """ # TODO: this does not look robust if A or B is None A = self.A B = self.B A_mask = self.A_mask B_mask = self.B_mask A_len = len(A[A_mask]) B_len = len(B[B_mask]) if A is not None: A[A_mask] = params[:A_len] if B is not None: B[B_mask] = params[A_len:A_len+B_len] nobs = self.nobs neqs = self.neqs sigma_u = self.sigma_u W = np.dot(npl.inv(B), A) trc_in = np.dot(np.dot(W.T, W), sigma_u) sign, b_logdet = slogdet(B**2) # numpy 1.4 compat b_slogdet = sign * b_logdet likl = -nobs/2. * (neqs * np.log(2 * np.pi) - np.log(npl.det(A)**2) + b_slogdet + np.trace(trc_in)) return likl
[docs] def score(self, AB_mask): """ Return the gradient of the loglike at AB_mask. Parameters ---------- AB_mask : unknown values of A and B matrix concatenated Notes ----- Return numerical gradient """ loglike = self.loglike if AB_mask.ndim > 1: AB_mask = AB_mask.ravel() grad = approx_fprime(AB_mask, loglike, epsilon=1e-8) # workaround shape of grad if only one parameter #9302 if AB_mask.size == 1 and grad.ndim == 2: grad = grad.ravel() return grad
[docs] def hessian(self, AB_mask): """ Returns numerical hessian. """ loglike = self.loglike if AB_mask.ndim > 1: AB_mask = AB_mask.ravel() return approx_hess(AB_mask, loglike)
def _solve_AB(self, start_params, maxiter, override=False, solver="bfgs"): """ Solves for MLE estimate of structural parameters Parameters ---------- override : bool, default False If True, returns estimates of A and B without checking order or rank condition solver : str or None, optional Solver to be used. The default is 'nm' (Nelder-Mead). Other choices are 'bfgs', 'newton' (Newton-Raphson), 'cg' conjugate, 'ncg' (non-conjugate gradient), and 'powell'. maxiter : int, optional The maximum number of iterations. Default is 500. Returns ------- A_solve, B_solve: ML solutions for A, B matrices """ # TODO: this could stand a refactor A_mask = self.A_mask B_mask = self.B_mask A = self.A B = self.B A_len = len(A[A_mask]) A[A_mask] = start_params[:A_len] B[B_mask] = start_params[A_len:] if not override: J = self._compute_J(A, B) self.check_order(J) self.check_rank(J) else: # TODO: change to a warning? print("Order/rank conditions have not been checked") if solver == "bfgs": kwargs = {"gtol": 1e-5} else: kwargs = {} retvals = super().fit(start_params=start_params, method=solver, maxiter=maxiter, disp=False, **kwargs).params if retvals.ndim > 1: retvals = retvals.ravel() A[A_mask] = retvals[:A_len] B[B_mask] = retvals[A_len:] return A, B def _compute_J(self, A_solve, B_solve): # first compute appropriate duplication matrix # taken from Magnus and Neudecker (1980), # "The Elimination Matrix: Some Lemmas and Applications # the creation of the D_n matrix follows MN (1980) directly, # while the rest follows Hamilton (1994) neqs = self.neqs sigma_u = self.sigma_u A_mask = self.A_mask B_mask = self.B_mask # first generate duplication matrix, see MN (1980) for notation D_nT = np.zeros([int((1.0 / 2) * (neqs) * (neqs + 1)), neqs**2]) for j in range(neqs): i = j while j <= i < neqs: u = np.zeros([int((1.0/2)*neqs*(neqs+1)), 1]) u[int(j * neqs + (i + 1) - (1.0 / 2) * (j + 1) * j - 1)] = 1 Tij = np.zeros([neqs, neqs]) Tij[i, j] = 1 Tij[j, i] = 1 D_nT = D_nT+np.dot(u, (Tij.ravel("F")[:, None]).T) i = i+1 D_n = D_nT.T D_pl = npl.pinv(D_n) # generate S_B S_B = np.zeros((neqs**2, len(A_solve[A_mask]))) S_D = np.zeros((neqs**2, len(B_solve[B_mask]))) j = 0 j_d = 0 if len(A_solve[A_mask]) != 0: A_vec = np.ravel(A_mask, order="F") for k in range(neqs**2): if A_vec[k]: S_B[k, j] = -1 j += 1 if len(B_solve[B_mask]) != 0: B_vec = np.ravel(B_mask, order="F") for k in range(neqs**2): if B_vec[k]: S_D[k, j_d] = 1 j_d += 1 # now compute J invA = npl.inv(A_solve) J_p1i = np.dot(np.dot(D_pl, np.kron(sigma_u, invA)), S_B) J_p1 = -2.0 * J_p1i J_p2 = np.dot(np.dot(D_pl, np.kron(invA, invA)), S_D) J = np.append(J_p1, J_p2, axis=1) return J
[docs] def check_order(self, J): if np.size(J, axis=0) < np.size(J, axis=1): raise ValueError("Order condition not met: " "solution may not be unique")
[docs] def check_rank(self, J): rank = np.linalg.matrix_rank(J) if rank < np.size(J, axis=1): raise ValueError("Rank condition not met: " "solution may not be unique.")
[docs] class SVARProcess(VARProcess): """ Class represents a known SVAR(p) process Parameters ---------- coefs : ndarray (p x k x k) intercept : ndarray (length k) sigma_u : ndarray (k x k) names : sequence (length k) A : neqs x neqs np.ndarray with unknown parameters marked with 'E' A_mask : neqs x neqs mask array with known parameters masked B : neqs x neqs np.ndarry with unknown parameters marked with 'E' B_mask : neqs x neqs mask array with known parameters masked """ def __init__(self, coefs, intercept, sigma_u, A_solve, B_solve, names=None): self.k_ar = len(coefs) self.neqs = coefs.shape[1] self.coefs = coefs self.intercept = intercept self.sigma_u = sigma_u self.A_solve = A_solve self.B_solve = B_solve self.names = names
[docs] def orth_ma_rep(self, maxn=10, P=None): """ Unavailable for SVAR """ raise NotImplementedError
[docs] def svar_ma_rep(self, maxn=10, P=None): """ Compute Structural MA coefficient matrices using MLE of A, B """ if P is None: A_solve = self.A_solve B_solve = self.B_solve P = np.dot(npl.inv(A_solve), B_solve) ma_mats = self.ma_rep(maxn=maxn) return np.array([np.dot(coefs, P) for coefs in ma_mats])
[docs] class SVARResults(SVARProcess, VARResults): """ Estimate VAR(p) process with fixed number of lags Parameters ---------- endog : ndarray endog_lagged : ndarray params : ndarray sigma_u : ndarray lag_order : int model : VAR model instance trend : str {'n', 'c', 'ct'} names : array_like List of names of the endogenous variables in order of appearance in `endog`. dates Attributes ---------- aic bic bse coefs : ndarray (p x K x K) Estimated A_i matrices, A_i = coefs[i-1] cov_params dates detomega df_model : int df_resid : int endog endog_lagged fittedvalues fpe intercept info_criteria k_ar : int k_trend : int llf model names neqs : int Number of variables (equations) nobs : int n_totobs : int params k_ar : int Order of VAR process params : ndarray (Kp + 1) x K A_i matrices and intercept in stacked form [int A_1 ... A_p] pvalue names : list variables names resid sigma_u : ndarray (K x K) Estimate of white noise process variance Var[u_t] sigma_u_mle stderr trenorder tvalues """ _model_type = "SVAR" def __init__(self, endog, endog_lagged, params, sigma_u, lag_order, A=None, B=None, A_mask=None, B_mask=None, model=None, trend="c", names=None, dates=None): self.model = model self.endog = endog self.endog_lagged = endog_lagged self.dates = dates self.n_totobs, self.neqs = self.endog.shape self.nobs = self.n_totobs - lag_order k_trend = util.get_trendorder(trend) if k_trend > 0: # make this the polynomial trend order trendorder = k_trend - 1 else: trendorder = None self.k_trend = k_trend self.k_exog = k_trend # now (0.9) required by VARProcess self.trendorder = trendorder self.exog_names = util.make_lag_names(names, lag_order, k_trend) self.params = params self.sigma_u = sigma_u # Each matrix needs to be transposed reshaped = self.params[self.k_trend:] reshaped = reshaped.reshape((lag_order, self.neqs, self.neqs)) # Need to transpose each coefficient matrix intercept = self.params[0] coefs = reshaped.swapaxes(1, 2).copy() # SVAR components # TODO: if you define these here, you do not also have to define # them in SVAR process, but I left them for now -ss self.A = A self.B = B self.A_mask = A_mask self.B_mask = B_mask super().__init__(coefs, intercept, sigma_u, A, B, names=names)
[docs] def irf(self, periods=10, var_order=None): """ Analyze structural impulse responses to shocks in system Parameters ---------- periods : int Returns ------- irf : IRAnalysis """ A = self.A B = self.B P = np.dot(npl.inv(A), B) return IRAnalysis(self, P=P, periods=periods, svar=True)
[docs] def sirf_errband_mc(self, orth=False, repl=1000, steps=10, signif=0.05, seed=None, burn=100, cum=False): """ Compute Monte Carlo integrated error bands assuming normally distributed for impulse response functions Parameters ---------- orth : bool, default False Compute orthogonalized impulse response error bands repl : int number of Monte Carlo replications to perform steps : int, default 10 number of impulse response periods signif : float (0 < signif <1) Significance level for error bars, defaults to 95% CI seed : int np.random.seed for replications burn : int number of initial observations to discard for simulation cum : bool, default False produce cumulative irf error bands Notes ----- Lütkepohl (2005) Appendix D Returns ------- Tuple of lower and upper arrays of ma_rep monte carlo standard errors """ neqs = self.neqs k_ar = self.k_ar coefs = self.coefs sigma_u = self.sigma_u intercept = self.intercept nobs = self.nobs ma_coll = np.zeros((repl, steps + 1, neqs, neqs)) A = self.A B = self.B A_mask = self.A_mask B_mask = self.B_mask A_pass = self.model.A_original B_pass = self.model.B_original s_type = self.model.svar_type g_list = [] def agg(impulses): if cum: return impulses.cumsum(axis=0) return impulses opt_A = A[A_mask] opt_B = B[B_mask] for i in range(repl): # discard first hundred to correct for starting bias sim = util.varsim(coefs, intercept, sigma_u, seed=seed, steps=nobs + burn) sim = sim[burn:] smod = SVAR(sim, svar_type=s_type, A=A_pass, B=B_pass) if i == 10: # Use first 10 to update starting val for remainder of fits mean_AB = np.mean(g_list, axis=0) split = len(A[A_mask]) opt_A = mean_AB[:split] opt_B = mean_AB[split:] sres = smod.fit(maxlags=k_ar, A_guess=opt_A, B_guess=opt_B) if i < 10: # save estimates for starting val if in first 10 g_list.append(np.append(sres.A[A_mask].tolist(), sres.B[B_mask].tolist())) ma_coll[i] = agg(sres.svar_ma_rep(maxn=steps)) ma_sort = np.sort(ma_coll, axis=0) # sort to get quantiles index = (int(round(signif / 2 * repl) - 1), int(round((1 - signif / 2) * repl) - 1)) lower = ma_sort[index[0], :, :, :] upper = ma_sort[index[1], :, :, :] return lower, upper