Source code for statsmodels.tsa.tsatools

from __future__ import annotations

from statsmodels.compat.python import Literal, lrange

import warnings

import numpy as np
import pandas as pd
from pandas import DataFrame
from pandas.tseries import offsets
from pandas.tseries.frequencies import to_offset

from statsmodels.tools.data import _is_recarray, _is_using_pandas
from statsmodels.tools.sm_exceptions import ValueWarning
from statsmodels.tools.typing import NDArray
from statsmodels.tools.validation import (
    array_like,
    bool_like,
    int_like,
    string_like,
)

__all__ = [
    "lagmat",
    "lagmat2ds",
    "add_trend",
    "duplication_matrix",
    "elimination_matrix",
    "commutation_matrix",
    "vec",
    "vech",
    "unvec",
    "unvech",
    "freq_to_period",
]


[docs] def add_trend(x, trend="c", prepend=False, has_constant="skip"): """ Add a trend and/or constant to an array. Parameters ---------- x : array_like Original array of data. trend : str {'n', 'c', 't', 'ct', 'ctt'} The trend to add. * 'n' add no trend. * 'c' add constant only. * 't' add trend only. * 'ct' add constant and linear trend. * 'ctt' add constant and linear and quadratic trend. prepend : bool If True, prepends the new data to the columns of X. has_constant : str {'raise', 'add', 'skip'} Controls what happens when trend is 'c' and a constant column already exists in x. 'raise' will raise an error. 'add' will add a column of 1s. 'skip' will return the data without change. 'skip' is the default. Returns ------- array_like The original data with the additional trend columns. If x is a pandas Series or DataFrame, then the trend column names are 'const', 'trend' and 'trend_squared'. See Also -------- statsmodels.tools.tools.add_constant Add a constant column to an array. Notes ----- Returns columns as ['ctt','ct','c'] whenever applicable. There is currently no checking for an existing trend. """ prepend = bool_like(prepend, "prepend") trend = string_like(trend, "trend", options=("n", "c", "t", "ct", "ctt")) has_constant = string_like( has_constant, "has_constant", options=("raise", "add", "skip") ) # TODO: could be generalized for trend of aribitrary order columns = ["const", "trend", "trend_squared"] if trend == "n": return x.copy() elif trend == "c": # handles structured arrays columns = columns[:1] trendorder = 0 elif trend == "ct" or trend == "t": columns = columns[:2] if trend == "t": columns = columns[1:2] trendorder = 1 elif trend == "ctt": trendorder = 2 if _is_recarray(x): from statsmodels.tools.sm_exceptions import recarray_exception raise NotImplementedError(recarray_exception) is_pandas = _is_using_pandas(x, None) if is_pandas: if isinstance(x, pd.Series): x = pd.DataFrame(x) else: x = x.copy() else: x = np.asanyarray(x) nobs = len(x) trendarr = np.vander( np.arange(1, nobs + 1, dtype=np.float64), trendorder + 1 ) # put in order ctt trendarr = np.fliplr(trendarr) if trend == "t": trendarr = trendarr[:, 1] if "c" in trend: if is_pandas: # Mixed type protection def safe_is_const(s): try: return np.ptp(s) == 0.0 and np.any(s != 0.0) except: return False col_const = x.apply(safe_is_const, 0) else: ptp0 = np.ptp(np.asanyarray(x), axis=0) col_is_const = ptp0 == 0 nz_const = col_is_const & (x[0] != 0) col_const = nz_const if np.any(col_const): if has_constant == "raise": if x.ndim == 1: base_err = "x is constant." else: columns = np.arange(x.shape[1])[col_const] if isinstance(x, pd.DataFrame): columns = x.columns const_cols = ", ".join([str(c) for c in columns]) base_err = ( "x contains one or more constant columns. Column(s) " f"{const_cols} are constant." ) msg = f"{base_err} Adding a constant with trend='{trend}' is not allowed." raise ValueError(msg) elif has_constant == "skip": columns = columns[1:] trendarr = trendarr[:, 1:] order = 1 if prepend else -1 if is_pandas: trendarr = pd.DataFrame(trendarr, index=x.index, columns=columns) x = [trendarr, x] x = pd.concat(x[::order], axis=1) else: x = [trendarr, x] x = np.column_stack(x[::order]) return x
[docs] def add_lag(x, col=None, lags=1, drop=False, insert=True): """ Returns an array with lags included given an array. Parameters ---------- x : array_like An array or NumPy ndarray subclass. Can be either a 1d or 2d array with observations in columns. col : int or None `col` can be an int of the zero-based column index. If it's a 1d array `col` can be None. lags : int The number of lags desired. drop : bool Whether to keep the contemporaneous variable for the data. insert : bool or int If True, inserts the lagged values after `col`. If False, appends the data. If int inserts the lags at int. Returns ------- array : ndarray Array with lags Examples -------- >>> import statsmodels.api as sm >>> data = sm.datasets.macrodata.load() >>> data = data.data[['year','quarter','realgdp','cpi']] >>> data = sm.tsa.add_lag(data, 'realgdp', lags=2) Notes ----- Trims the array both forward and backward, so that the array returned so that the length of the returned array is len(`X`) - lags. The lags are returned in increasing order, ie., t-1,t-2,...,t-lags """ lags = int_like(lags, "lags") drop = bool_like(drop, "drop") x = array_like(x, "x", ndim=2) if col is None: col = 0 # handle negative index if col < 0: col = x.shape[1] + col if x.ndim == 1: x = x[:, None] contemp = x[:, col] if insert is True: ins_idx = col + 1 elif insert is False: ins_idx = x.shape[1] else: if insert < 0: # handle negative index insert = x.shape[1] + insert + 1 if insert > x.shape[1]: insert = x.shape[1] warnings.warn( "insert > number of variables, inserting at the" " last position", ValueWarning, ) ins_idx = insert ndlags = lagmat(contemp, lags, trim="Both") first_cols = lrange(ins_idx) last_cols = lrange(ins_idx, x.shape[1]) if drop: if col in first_cols: first_cols.pop(first_cols.index(col)) else: last_cols.pop(last_cols.index(col)) return np.column_stack((x[lags:, first_cols], ndlags, x[lags:, last_cols]))
[docs] def detrend(x, order=1, axis=0): """ Detrend an array with a trend of given order along axis 0 or 1. Parameters ---------- x : array_like, 1d or 2d Data, if 2d, then each row or column is independently detrended with the same trendorder, but independent trend estimates. order : int The polynomial order of the trend, zero is constant, one is linear trend, two is quadratic trend. axis : int Axis can be either 0, observations by rows, or 1, observations by columns. Returns ------- ndarray The detrended series is the residual of the linear regression of the data on the trend of given order. """ order = int_like(order, "order") axis = int_like(axis, "axis") if x.ndim == 2 and int(axis) == 1: x = x.T elif x.ndim > 2: raise NotImplementedError( "x.ndim > 2 is not implemented until it is needed" ) nobs = x.shape[0] if order == 0: # Special case demean resid = x - x.mean(axis=0) else: trends = np.vander(np.arange(float(nobs)), N=order + 1) beta = np.linalg.pinv(trends).dot(x) resid = x - np.dot(trends, beta) if x.ndim == 2 and int(axis) == 1: resid = resid.T return resid
[docs] def lagmat(x, maxlag: int, trim: Literal["forward", "backward", "both", "none"]='forward', original: Literal["ex", "sep", "in"]="ex", use_pandas: bool=False )-> NDArray | DataFrame | tuple[NDArray, NDArray] | tuple[DataFrame, DataFrame]: """ Create 2d array of lags. Parameters ---------- x : array_like Data; if 2d, observation in rows and variables in columns. maxlag : int All lags from zero to maxlag are included. trim : {'forward', 'backward', 'both', 'none', None} The trimming method to use. * 'forward' : trim invalid observations in front. * 'backward' : trim invalid initial observations. * 'both' : trim invalid observations on both sides. * 'none', None : no trimming of observations. original : {'ex','sep','in'} How the original is treated. * 'ex' : drops the original array returning only the lagged values. * 'in' : returns the original array and the lagged values as a single array. * 'sep' : returns a tuple (original array, lagged values). The original array is truncated to have the same number of rows as the returned lagmat. use_pandas : bool If true, returns a DataFrame when the input is a pandas Series or DataFrame. If false, return numpy ndarrays. Returns ------- lagmat : ndarray The array with lagged observations. y : ndarray, optional Only returned if original == 'sep'. Notes ----- When using a pandas DataFrame or Series with use_pandas=True, trim can only be 'forward' or 'both' since it is not possible to consistently extend index values. Examples -------- >>> from statsmodels.tsa.tsatools import lagmat >>> import numpy as np >>> X = np.arange(1,7).reshape(-1,2) >>> lagmat(X, maxlag=2, trim="forward", original='in') array([[ 1., 2., 0., 0., 0., 0.], [ 3., 4., 1., 2., 0., 0.], [ 5., 6., 3., 4., 1., 2.]]) >>> lagmat(X, maxlag=2, trim="backward", original='in') array([[ 5., 6., 3., 4., 1., 2.], [ 0., 0., 5., 6., 3., 4.], [ 0., 0., 0., 0., 5., 6.]]) >>> lagmat(X, maxlag=2, trim="both", original='in') array([[ 5., 6., 3., 4., 1., 2.]]) >>> lagmat(X, maxlag=2, trim="none", original='in') array([[ 1., 2., 0., 0., 0., 0.], [ 3., 4., 1., 2., 0., 0.], [ 5., 6., 3., 4., 1., 2.], [ 0., 0., 5., 6., 3., 4.], [ 0., 0., 0., 0., 5., 6.]]) """ maxlag = int_like(maxlag, "maxlag") use_pandas = bool_like(use_pandas, "use_pandas") trim = string_like( trim, "trim", optional=True, options=("forward", "backward", "both", "none"), ) original = string_like(original, "original", options=("ex", "sep", "in")) # TODO: allow list of lags additional to maxlag orig = x x = array_like(x, "x", ndim=2, dtype=None) is_pandas = _is_using_pandas(orig, None) and use_pandas trim = "none" if trim is None else trim trim = trim.lower() if is_pandas and trim in ("none", "backward"): raise ValueError( "trim cannot be 'none' or 'backward' when used on " "Series or DataFrames" ) dropidx = 0 nobs, nvar = x.shape if original in ["ex", "sep"]: dropidx = nvar if maxlag >= nobs: raise ValueError("maxlag should be < nobs") lm = np.zeros((nobs + maxlag, nvar * (maxlag + 1))) for k in range(0, int(maxlag + 1)): lm[ maxlag - k: nobs + maxlag - k, nvar * (maxlag - k): nvar * (maxlag - k + 1), ] = x if trim in ("none", "forward"): startobs = 0 elif trim in ("backward", "both"): startobs = maxlag else: raise ValueError("trim option not valid") if trim in ("none", "backward"): stopobs = len(lm) else: stopobs = nobs if is_pandas: x = orig if isinstance(x, DataFrame): x_columns = [str(c) for c in x.columns] if len(set(x_columns)) != x.shape[1]: raise ValueError( "Columns names must be distinct after conversion to string " "(if not already strings)." ) else: x_columns = [str(x.name)] columns = [str(col) for col in x_columns] for lag in range(maxlag): lag_str = str(lag + 1) columns.extend([str(col) + ".L." + lag_str for col in x_columns]) lm = DataFrame(lm[:stopobs], index=x.index, columns=columns) lags = lm.iloc[startobs:] if original in ("sep", "ex"): leads = lags[x_columns] lags = lags.drop(x_columns, axis=1) else: lags = lm[startobs:stopobs, dropidx:] if original == "sep": leads = lm[startobs:stopobs, :dropidx] if original == "sep": return lags, leads else: return lags
[docs] def lagmat2ds( x, maxlag0, maxlagex=None, dropex=0, trim="forward", use_pandas=False ): """ Generate lagmatrix for 2d array, columns arranged by variables. Parameters ---------- x : array_like Data, 2d. Observations in rows and variables in columns. maxlag0 : int The first variable all lags from zero to maxlag are included. maxlagex : {None, int} The max lag for all other variables all lags from zero to maxlag are included. dropex : int Exclude first dropex lags from other variables. For all variables, except the first, lags from dropex to maxlagex are included. trim : str The trimming method to use. * 'forward' : trim invalid observations in front. * 'backward' : trim invalid initial observations. * 'both' : trim invalid observations on both sides. * 'none' : no trimming of observations. use_pandas : bool If true, returns a DataFrame when the input is a pandas Series or DataFrame. If false, return numpy ndarrays. Returns ------- ndarray The array with lagged observations, columns ordered by variable. Notes ----- Inefficient implementation for unequal lags, implemented for convenience. """ maxlag0 = int_like(maxlag0, "maxlag0") maxlagex = int_like(maxlagex, "maxlagex", optional=True) trim = string_like( trim, "trim", optional=True, options=("forward", "backward", "both", "none"), ) if maxlagex is None: maxlagex = maxlag0 maxlag = max(maxlag0, maxlagex) is_pandas = _is_using_pandas(x, None) if x.ndim == 1: if is_pandas: x = pd.DataFrame(x) else: x = x[:, None] elif x.ndim == 0 or x.ndim > 2: raise ValueError("Only supports 1 and 2-dimensional data.") nobs, nvar = x.shape if is_pandas and use_pandas: lags = lagmat( x.iloc[:, 0], maxlag, trim=trim, original="in", use_pandas=True ) lagsli = [lags.iloc[:, : maxlag0 + 1]] for k in range(1, nvar): lags = lagmat( x.iloc[:, k], maxlag, trim=trim, original="in", use_pandas=True ) lagsli.append(lags.iloc[:, dropex : maxlagex + 1]) return pd.concat(lagsli, axis=1) elif is_pandas: x = np.asanyarray(x) lagsli = [ lagmat(x[:, 0], maxlag, trim=trim, original="in")[:, : maxlag0 + 1] ] for k in range(1, nvar): lagsli.append( lagmat(x[:, k], maxlag, trim=trim, original="in")[ :, dropex : maxlagex + 1 ] ) return np.column_stack(lagsli)
def vec(mat): return mat.ravel("F") def vech(mat): # Gets Fortran-order return mat.T.take(_triu_indices(len(mat))) # tril/triu/diag, suitable for ndarray.take def _tril_indices(n): rows, cols = np.tril_indices(n) return rows * n + cols def _triu_indices(n): rows, cols = np.triu_indices(n) return rows * n + cols def _diag_indices(n): rows, cols = np.diag_indices(n) return rows * n + cols def unvec(v): k = int(np.sqrt(len(v))) assert k * k == len(v) return v.reshape((k, k), order="F") def unvech(v): # quadratic formula, correct fp error rows = 0.5 * (-1 + np.sqrt(1 + 8 * len(v))) rows = int(np.round(rows)) result = np.zeros((rows, rows)) result[np.triu_indices(rows)] = v result = result + result.T # divide diagonal elements by 2 result[np.diag_indices(rows)] /= 2 return result def duplication_matrix(n): """ Create duplication matrix D_n which satisfies vec(S) = D_n vech(S) for symmetric matrix S Returns ------- D_n : ndarray """ n = int_like(n, "n") tmp = np.eye(n * (n + 1) // 2) return np.array([unvech(x).ravel() for x in tmp]).T def elimination_matrix(n): """ Create the elimination matrix L_n which satisfies vech(M) = L_n vec(M) for any matrix M Parameters ---------- Returns ------- """ n = int_like(n, "n") vech_indices = vec(np.tril(np.ones((n, n)))) return np.eye(n * n)[vech_indices != 0] def commutation_matrix(p, q): """ Create the commutation matrix K_{p,q} satisfying vec(A') = K_{p,q} vec(A) Parameters ---------- p : int q : int Returns ------- K : ndarray (pq x pq) """ p = int_like(p, "p") q = int_like(q, "q") K = np.eye(p * q) indices = np.arange(p * q).reshape((p, q), order="F") return K.take(indices.ravel(), axis=0) def _ar_transparams(params): """ Transforms params to induce stationarity/invertability. Parameters ---------- params : array_like The AR coefficients Reference --------- Jones(1980) """ newparams = np.tanh(params / 2) tmp = np.tanh(params / 2) for j in range(1, len(params)): a = newparams[j] for kiter in range(j): tmp[kiter] -= a * newparams[j - kiter - 1] newparams[:j] = tmp[:j] return newparams def _ar_invtransparams(params): """ Inverse of the Jones reparameterization Parameters ---------- params : array_like The transformed AR coefficients """ params = params.copy() tmp = params.copy() for j in range(len(params) - 1, 0, -1): a = params[j] for kiter in range(j): tmp[kiter] = (params[kiter] + a * params[j - kiter - 1]) / ( 1 - a ** 2 ) params[:j] = tmp[:j] invarcoefs = 2 * np.arctanh(params) return invarcoefs def _ma_transparams(params): """ Transforms params to induce stationarity/invertability. Parameters ---------- params : ndarray The ma coeffecients of an (AR)MA model. Reference --------- Jones(1980) """ newparams = ((1 - np.exp(-params)) / (1 + np.exp(-params))).copy() tmp = ((1 - np.exp(-params)) / (1 + np.exp(-params))).copy() # levinson-durbin to get macf for j in range(1, len(params)): b = newparams[j] for kiter in range(j): tmp[kiter] += b * newparams[j - kiter - 1] newparams[:j] = tmp[:j] return newparams def _ma_invtransparams(macoefs): """ Inverse of the Jones reparameterization Parameters ---------- params : ndarray The transformed MA coefficients """ tmp = macoefs.copy() for j in range(len(macoefs) - 1, 0, -1): b = macoefs[j] for kiter in range(j): tmp[kiter] = (macoefs[kiter] - b * macoefs[j - kiter - 1]) / ( 1 - b ** 2 ) macoefs[:j] = tmp[:j] invmacoefs = -np.log((1 - macoefs) / (1 + macoefs)) return invmacoefs def unintegrate_levels(x, d): """ Returns the successive differences needed to unintegrate the series. Parameters ---------- x : array_like The original series d : int The number of differences of the differenced series. Returns ------- y : array_like The increasing differences from 0 to d-1 of the first d elements of x. See Also -------- unintegrate """ d = int_like(d, "d") x = x[:d] return np.asarray([np.diff(x, d - i)[0] for i in range(d, 0, -1)]) def unintegrate(x, levels): """ After taking n-differences of a series, return the original series Parameters ---------- x : array_like The n-th differenced series levels : list A list of the first-value in each differenced series, for [first-difference, second-difference, ..., n-th difference] Returns ------- y : array_like The original series de-differenced Examples -------- >>> x = np.array([1, 3, 9., 19, 8.]) >>> levels = unintegrate_levels(x, 2) >>> levels array([ 1., 2.]) >>> unintegrate(np.diff(x, 2), levels) array([ 1., 3., 9., 19., 8.]) """ levels = list(levels)[:] # copy if len(levels) > 1: x0 = levels.pop(-1) return unintegrate(np.cumsum(np.r_[x0, x]), levels) x0 = levels[0] return np.cumsum(np.r_[x0, x]) def freq_to_period(freq: str | offsets.DateOffset) -> int: """ Convert a pandas frequency to a periodicity Parameters ---------- freq : str or offset Frequency to convert Returns ------- int Periodicity of freq Notes ----- Annual maps to 1, quarterly maps to 4, monthly to 12, weekly to 52. """ if not isinstance(freq, offsets.DateOffset): freq = to_offset(freq) # go ahead and standardize assert isinstance(freq, offsets.DateOffset) freq = freq.rule_code.upper() yearly_freqs = ("A-", "AS-", "Y-", "YS-", "YE-") if freq in ("A", "Y") or freq.startswith(yearly_freqs): return 1 elif freq == "Q" or freq.startswith(("Q-", "QS", "QE")): return 4 elif freq == "M" or freq.startswith(("M-", "MS", "ME")): return 12 elif freq == "W" or freq.startswith("W-"): return 52 elif freq == "D": return 7 elif freq == "B": return 5 elif freq == "H": return 24 else: # pragma : no cover raise ValueError( "freq {} not understood. Please report if you " "think this is in error.".format(freq) )

Last update: Apr 18, 2024