Source code for statsmodels.tsa.tsatools

from __future__ import annotations

from statsmodels.compat.python import lrange

from typing import Literal
import warnings

import numpy as np
import pandas as pd
from pandas import DataFrame
from pandas.tseries import offsets
from pandas.tseries.frequencies import to_offset

from statsmodels.tools.data import _is_recarray, _is_using_pandas
from statsmodels.tools.sm_exceptions import ValueWarning
from statsmodels.tools.typing import NDArray
from statsmodels.tools.validation import (
    array_like,
    bool_like,
    int_like,
    string_like,
)

__all__ = [
    "lagmat",
    "lagmat2ds",
    "add_trend",
    "duplication_matrix",
    "elimination_matrix",
    "commutation_matrix",
    "vec",
    "vech",
    "unvec",
    "unvech",
    "freq_to_period",
]


[docs] def add_trend(x, trend="c", prepend=False, has_constant="skip"): """ Add a trend and/or constant to an array. Parameters ---------- x : array_like Original array of data. trend : str {'n', 'c', 't', 'ct', 'ctt'} The trend to add. * 'n' add no trend. * 'c' add constant only. * 't' add trend only. * 'ct' add constant and linear trend. * 'ctt' add constant and linear and quadratic trend. prepend : bool If True, prepends the new data to the columns of X. has_constant : str {'raise', 'add', 'skip'} Controls what happens when trend is 'c' and a constant column already exists in x. 'raise' will raise an error. 'add' will add a column of 1s. 'skip' will return the data without change. 'skip' is the default. Returns ------- array_like The original data with the additional trend columns. If x is a pandas Series or DataFrame, then the trend column names are 'const', 'trend' and 'trend_squared'. See Also -------- statsmodels.tools.tools.add_constant Add a constant column to an array. Notes ----- Returns columns as ['ctt','ct','c'] whenever applicable. There is currently no checking for an existing trend. """ prepend = bool_like(prepend, "prepend") trend = string_like(trend, "trend", options=("n", "c", "t", "ct", "ctt")) has_constant = string_like( has_constant, "has_constant", options=("raise", "add", "skip") ) # TODO: could be generalized for trend of aribitrary order columns = ["const", "trend", "trend_squared"] if trend == "n": return x.copy() elif trend == "c": # handles structured arrays columns = columns[:1] trendorder = 0 elif trend == "ct" or trend == "t": columns = columns[:2] if trend == "t": columns = columns[1:2] trendorder = 1 elif trend == "ctt": trendorder = 2 if _is_recarray(x): from statsmodels.tools.sm_exceptions import recarray_exception raise NotImplementedError(recarray_exception) is_pandas = _is_using_pandas(x, None) if is_pandas: if isinstance(x, pd.Series): x = pd.DataFrame(x) else: x = x.copy() else: x = np.asanyarray(x) nobs = len(x) trendarr = np.vander( np.arange(1, nobs + 1, dtype=np.float64), trendorder + 1 ) # put in order ctt trendarr = np.fliplr(trendarr) if trend == "t": trendarr = trendarr[:, 1] if "c" in trend: if is_pandas: # Mixed type protection def safe_is_const(s): try: return np.ptp(s) == 0.0 and np.any(s != 0.0) except Exception: return False col_const = x.apply(safe_is_const, 0) else: ptp0 = np.ptp(np.asanyarray(x), axis=0) col_is_const = ptp0 == 0 nz_const = col_is_const & (x[0] != 0) col_const = nz_const if np.any(col_const): if has_constant == "raise": if x.ndim == 1: base_err = "x is constant." else: columns = np.arange(x.shape[1])[col_const] if isinstance(x, pd.DataFrame): columns = x.columns const_cols = ", ".join([str(c) for c in columns]) base_err = ( "x contains one or more constant columns. Column(s) " f"{const_cols} are constant." ) raise ValueError( f"{base_err} Adding a constant with trend='{trend}' is " "not allowed." ) elif has_constant == "skip": columns = columns[1:] trendarr = trendarr[:, 1:] order = 1 if prepend else -1 if is_pandas: trendarr = pd.DataFrame(trendarr, index=x.index, columns=columns) x = [trendarr, x] x = pd.concat(x[::order], axis=1) else: x = [trendarr, x] x = np.column_stack(x[::order]) return x
[docs] def add_lag(x, col=None, lags=1, drop=False, insert=True): """ Returns an array with lags included given an array. Parameters ---------- x : array_like An array or NumPy ndarray subclass. Can be either a 1d or 2d array with observations in columns. col : int or None `col` can be an int of the zero-based column index. If it's a 1d array `col` can be None. lags : int The number of lags desired. drop : bool Whether to keep the contemporaneous variable for the data. insert : bool or int If True, inserts the lagged values after `col`. If False, appends the data. If int inserts the lags at int. Returns ------- array : ndarray Array with lags Examples -------- >>> import statsmodels.api as sm >>> data = sm.datasets.macrodata.load() >>> data = data.data[['year','quarter','realgdp','cpi']] >>> data = sm.tsa.add_lag(data, 'realgdp', lags=2) Notes ----- Trims the array both forward and backward, so that the array returned so that the length of the returned array is len(`X`) - lags. The lags are returned in increasing order, ie., t-1,t-2,...,t-lags """ lags = int_like(lags, "lags") drop = bool_like(drop, "drop") x = array_like(x, "x", ndim=2) if col is None: col = 0 # handle negative index if col < 0: col = x.shape[1] + col if x.ndim == 1: x = x[:, None] contemp = x[:, col] if insert is True: ins_idx = col + 1 elif insert is False: ins_idx = x.shape[1] else: if insert < 0: # handle negative index insert = x.shape[1] + insert + 1 if insert > x.shape[1]: insert = x.shape[1] warnings.warn( "insert > number of variables, inserting at the" " last position", ValueWarning, ) ins_idx = insert ndlags = lagmat(contemp, lags, trim="Both") first_cols = lrange(ins_idx) last_cols = lrange(ins_idx, x.shape[1]) if drop: if col in first_cols: first_cols.pop(first_cols.index(col)) else: last_cols.pop(last_cols.index(col)) return np.column_stack((x[lags:, first_cols], ndlags, x[lags:, last_cols]))
[docs] def detrend(x, order=1, axis=0): """ Detrend an array with a trend of given order along axis 0 or 1. Parameters ---------- x : array_like, 1d or 2d Data, if 2d, then each row or column is independently detrended with the same trendorder, but independent trend estimates. order : int The polynomial order of the trend, zero is constant, one is linear trend, two is quadratic trend. axis : int Axis can be either 0, observations by rows, or 1, observations by columns. Returns ------- ndarray The detrended series is the residual of the linear regression of the data on the trend of given order. """ order = int_like(order, "order") axis = int_like(axis, "axis") if x.ndim == 2 and int(axis) == 1: x = x.T elif x.ndim > 2: raise NotImplementedError( "x.ndim > 2 is not implemented until it is needed" ) nobs = x.shape[0] if order == 0: # Special case demean resid = x - x.mean(axis=0) else: trends = np.vander(np.arange(float(nobs)), N=order + 1) beta = np.linalg.pinv(trends).dot(x) resid = x - np.dot(trends, beta) if x.ndim == 2 and int(axis) == 1: resid = resid.T return resid
[docs] def lagmat( x, maxlag: int, trim: Literal["forward", "backward", "both", "none"] = "forward", original: Literal["ex", "sep", "in"] = "ex", use_pandas: bool = False, ) -> NDArray | DataFrame | tuple[NDArray, NDArray] | tuple[DataFrame, DataFrame]: """ Create 2d array of lags. Parameters ---------- x : array_like Data; if 2d, observation in rows and variables in columns. maxlag : int All lags from zero to maxlag are included. trim : {'forward', 'backward', 'both', 'none', None} The trimming method to use. * 'forward' : trim invalid observations in front. * 'backward' : trim invalid initial observations. * 'both' : trim invalid observations on both sides. * 'none', None : no trimming of observations. original : {'ex','sep','in'} How the original is treated. * 'ex' : drops the original array returning only the lagged values. * 'in' : returns the original array and the lagged values as a single array. * 'sep' : returns a tuple (original array, lagged values). The original array is truncated to have the same number of rows as the returned lagmat. use_pandas : bool If true, returns a DataFrame when the input is a pandas Series or DataFrame. If false, return numpy ndarrays. Returns ------- lagmat : ndarray The array with lagged observations. y : ndarray, optional Only returned if original == 'sep'. Notes ----- When using a pandas DataFrame or Series with use_pandas=True, trim can only be 'forward' or 'both' since it is not possible to consistently extend index values. Examples -------- >>> from statsmodels.tsa.tsatools import lagmat >>> import numpy as np >>> X = np.arange(1,7).reshape(-1,2) >>> lagmat(X, maxlag=2, trim="forward", original='in') array([[ 1., 2., 0., 0., 0., 0.], [ 3., 4., 1., 2., 0., 0.], [ 5., 6., 3., 4., 1., 2.]]) >>> lagmat(X, maxlag=2, trim="backward", original='in') array([[ 5., 6., 3., 4., 1., 2.], [ 0., 0., 5., 6., 3., 4.], [ 0., 0., 0., 0., 5., 6.]]) >>> lagmat(X, maxlag=2, trim="both", original='in') array([[ 5., 6., 3., 4., 1., 2.]]) >>> lagmat(X, maxlag=2, trim="none", original='in') array([[ 1., 2., 0., 0., 0., 0.], [ 3., 4., 1., 2., 0., 0.], [ 5., 6., 3., 4., 1., 2.], [ 0., 0., 5., 6., 3., 4.], [ 0., 0., 0., 0., 5., 6.]]) """ maxlag = int_like(maxlag, "maxlag") use_pandas = bool_like(use_pandas, "use_pandas") trim = string_like( trim, "trim", optional=True, options=("forward", "backward", "both", "none"), ) original = string_like(original, "original", options=("ex", "sep", "in")) # TODO: allow list of lags additional to maxlag orig = x x = array_like(x, "x", ndim=2, dtype=None) is_pandas = _is_using_pandas(orig, None) and use_pandas trim = "none" if trim is None else trim trim = trim.lower() if is_pandas and trim in ("none", "backward"): raise ValueError( "trim cannot be 'none' or 'backward' when used on " "Series or DataFrames" ) dropidx = 0 nobs, nvar = x.shape if original in ["ex", "sep"]: dropidx = nvar if maxlag >= nobs: raise ValueError("maxlag should be < nobs") lm = np.zeros((nobs + maxlag, nvar * (maxlag + 1))) for k in range(0, int(maxlag + 1)): lm[ maxlag - k : nobs + maxlag - k, nvar * (maxlag - k) : nvar * (maxlag - k + 1), ] = x if trim in ("none", "forward"): startobs = 0 elif trim in ("backward", "both"): startobs = maxlag else: raise ValueError("trim option not valid") if trim in ("none", "backward"): stopobs = len(lm) else: stopobs = nobs if is_pandas: x = orig if isinstance(x, DataFrame): x_columns = [str(c) for c in x.columns] if len(set(x_columns)) != x.shape[1]: raise ValueError( "Columns names must be distinct after conversion to string " "(if not already strings)." ) else: x_columns = [str(x.name)] columns = [str(col) for col in x_columns] for lag in range(maxlag): lag_str = str(lag + 1) columns.extend([str(col) + ".L." + lag_str for col in x_columns]) lm = DataFrame(lm[:stopobs], index=x.index, columns=columns) lags = lm.iloc[startobs:] if original in ("sep", "ex"): leads = lags[x_columns] lags = lags.drop(x_columns, axis=1) else: lags = lm[startobs:stopobs, dropidx:] if original == "sep": leads = lm[startobs:stopobs, :dropidx] if original == "sep": return lags, leads else: return lags
[docs] def lagmat2ds( x, maxlag0, maxlagex=None, dropex=0, trim="forward", use_pandas=False ): """ Generate lagmatrix for 2d array, columns arranged by variables. Parameters ---------- x : array_like Data, 2d. Observations in rows and variables in columns. maxlag0 : int The first variable all lags from zero to maxlag are included. maxlagex : {None, int} The max lag for all other variables all lags from zero to maxlag are included. dropex : int Exclude first dropex lags from other variables. For all variables, except the first, lags from dropex to maxlagex are included. trim : str The trimming method to use. * 'forward' : trim invalid observations in front. * 'backward' : trim invalid initial observations. * 'both' : trim invalid observations on both sides. * 'none' : no trimming of observations. use_pandas : bool If true, returns a DataFrame when the input is a pandas Series or DataFrame. If false, return numpy ndarrays. Returns ------- ndarray The array with lagged observations, columns ordered by variable. Notes ----- Inefficient implementation for unequal lags, implemented for convenience. """ maxlag0 = int_like(maxlag0, "maxlag0") maxlagex = int_like(maxlagex, "maxlagex", optional=True) trim = string_like( trim, "trim", optional=True, options=("forward", "backward", "both", "none"), ) if maxlagex is None: maxlagex = maxlag0 maxlag = max(maxlag0, maxlagex) is_pandas = _is_using_pandas(x, None) if x.ndim == 1: if is_pandas: x = pd.DataFrame(x) else: x = x[:, None] elif x.ndim == 0 or x.ndim > 2: raise ValueError("Only supports 1 and 2-dimensional data.") nobs, nvar = x.shape if is_pandas and use_pandas: lags = lagmat( x.iloc[:, 0], maxlag, trim=trim, original="in", use_pandas=True ) lagsli = [lags.iloc[:, : maxlag0 + 1]] for k in range(1, nvar): lags = lagmat( x.iloc[:, k], maxlag, trim=trim, original="in", use_pandas=True ) lagsli.append(lags.iloc[:, dropex : maxlagex + 1]) return pd.concat(lagsli, axis=1) elif is_pandas: x = np.asanyarray(x) lagsli = [ lagmat(x[:, 0], maxlag, trim=trim, original="in")[:, : maxlag0 + 1] ] for k in range(1, nvar): lagsli.append( lagmat(x[:, k], maxlag, trim=trim, original="in")[ :, dropex : maxlagex + 1 ] ) return np.column_stack(lagsli)
def vec(mat): return mat.ravel("F") def vech(mat): # Gets Fortran-order return mat.T.take(_triu_indices(len(mat))) # tril/triu/diag, suitable for ndarray.take def _tril_indices(n): rows, cols = np.tril_indices(n) return rows * n + cols def _triu_indices(n): rows, cols = np.triu_indices(n) return rows * n + cols def _diag_indices(n): rows, cols = np.diag_indices(n) return rows * n + cols def unvec(v): k = int(np.sqrt(len(v))) assert k * k == len(v) return v.reshape((k, k), order="F") def unvech(v): # quadratic formula, correct fp error rows = 0.5 * (-1 + np.sqrt(1 + 8 * len(v))) rows = int(np.round(rows)) result = np.zeros((rows, rows)) result[np.triu_indices(rows)] = v result = result + result.T # divide diagonal elements by 2 result[np.diag_indices(rows)] /= 2 return result def duplication_matrix(n): """ Create duplication matrix D_n which satisfies vec(S) = D_n vech(S) for symmetric matrix S Returns ------- D_n : ndarray """ n = int_like(n, "n") tmp = np.eye(n * (n + 1) // 2) return np.array([unvech(x).ravel() for x in tmp]).T def elimination_matrix(n): """ Create the elimination matrix L_n which satisfies vech(M) = L_n vec(M) for any matrix M Parameters ---------- Returns ------- """ n = int_like(n, "n") vech_indices = vec(np.tril(np.ones((n, n)))) return np.eye(n * n)[vech_indices != 0] def commutation_matrix(p, q): """ Create the commutation matrix K_{p,q} satisfying vec(A') = K_{p,q} vec(A) Parameters ---------- p : int q : int Returns ------- K : ndarray (pq x pq) """ p = int_like(p, "p") q = int_like(q, "q") K = np.eye(p * q) indices = np.arange(p * q).reshape((p, q), order="F") return K.take(indices.ravel(), axis=0) def _ar_transparams(params): """ Transforms params to induce stationarity/invertability. Parameters ---------- params : array_like The AR coefficients Reference --------- Jones(1980) """ newparams = np.tanh(params / 2) tmp = np.tanh(params / 2) for j in range(1, len(params)): a = newparams[j] for kiter in range(j): tmp[kiter] -= a * newparams[j - kiter - 1] newparams[:j] = tmp[:j] return newparams def _ar_invtransparams(params): """ Inverse of the Jones reparameterization Parameters ---------- params : array_like The transformed AR coefficients """ params = params.copy() tmp = params.copy() for j in range(len(params) - 1, 0, -1): a = params[j] for kiter in range(j): tmp[kiter] = (params[kiter] + a * params[j - kiter - 1]) / ( 1 - a ** 2 ) params[:j] = tmp[:j] invarcoefs = 2 * np.arctanh(params) return invarcoefs def _ma_transparams(params): """ Transforms params to induce stationarity/invertability. Parameters ---------- params : ndarray The ma coeffecients of an (AR)MA model. Reference --------- Jones(1980) """ newparams = ((1 - np.exp(-params)) / (1 + np.exp(-params))).copy() tmp = ((1 - np.exp(-params)) / (1 + np.exp(-params))).copy() # levinson-durbin to get macf for j in range(1, len(params)): b = newparams[j] for kiter in range(j): tmp[kiter] += b * newparams[j - kiter - 1] newparams[:j] = tmp[:j] return newparams def _ma_invtransparams(macoefs): """ Inverse of the Jones reparameterization Parameters ---------- params : ndarray The transformed MA coefficients """ tmp = macoefs.copy() for j in range(len(macoefs) - 1, 0, -1): b = macoefs[j] for kiter in range(j): tmp[kiter] = (macoefs[kiter] - b * macoefs[j - kiter - 1]) / ( 1 - b ** 2 ) macoefs[:j] = tmp[:j] invmacoefs = -np.log((1 - macoefs) / (1 + macoefs)) return invmacoefs def unintegrate_levels(x, d): """ Returns the successive differences needed to unintegrate the series. Parameters ---------- x : array_like The original series d : int The number of differences of the differenced series. Returns ------- y : array_like The increasing differences from 0 to d-1 of the first d elements of x. See Also -------- unintegrate """ d = int_like(d, "d") x = x[:d] return np.asarray([np.diff(x, d - i)[0] for i in range(d, 0, -1)]) def unintegrate(x, levels): """ After taking n-differences of a series, return the original series Parameters ---------- x : array_like The n-th differenced series levels : list A list of the first-value in each differenced series, for [first-difference, second-difference, ..., n-th difference] Returns ------- y : array_like The original series de-differenced Examples -------- >>> x = np.array([1, 3, 9., 19, 8.]) >>> levels = unintegrate_levels(x, 2) >>> levels array([ 1., 2.]) >>> unintegrate(np.diff(x, 2), levels) array([ 1., 3., 9., 19., 8.]) """ levels = list(levels)[:] # copy if len(levels) > 1: x0 = levels.pop(-1) return unintegrate(np.cumsum(np.r_[x0, x]), levels) x0 = levels[0] return np.cumsum(np.r_[x0, x]) def freq_to_period(freq: str | offsets.DateOffset) -> int: """ Convert a pandas frequency to a periodicity Parameters ---------- freq : str or offset Frequency to convert Returns ------- int Periodicity of freq Notes ----- Annual maps to 1, quarterly maps to 4, monthly to 12, weekly to 52. """ if not isinstance(freq, offsets.DateOffset): freq = to_offset(freq) # go ahead and standardize assert isinstance(freq, offsets.DateOffset) freq = freq.rule_code.upper() yearly_freqs = ("A-", "AS-", "Y-", "YS-", "YE-") if freq in ("A", "Y") or freq.startswith(yearly_freqs): return 1 elif freq == "Q" or freq.startswith(("Q-", "QS", "QE")): return 4 elif freq == "M" or freq.startswith(("M-", "MS", "ME")): return 12 elif freq == "W" or freq.startswith("W-"): return 52 elif freq == "D": return 7 elif freq == "B": return 5 elif freq == "H": return 24 else: # pragma : no cover raise ValueError( "freq {} not understood. Please report if you " "think this is in error.".format(freq) )

Last update: Jan 07, 2025