Source code for statsmodels.tsa.regime_switching.markov_autoregression

"""
Markov switching autoregression models

Author: Chad Fulton
License: BSD-3
"""


import numpy as np
import statsmodels.base.wrapper as wrap

from statsmodels.tsa.tsatools import lagmat
from statsmodels.tsa.regime_switching import (
    markov_switching, markov_regression)
from statsmodels.tsa.statespace.tools import (
    constrain_stationary_univariate, unconstrain_stationary_univariate)


class MarkovAutoregression(markov_regression.MarkovRegression):
    r"""
    Markov switching regression model

    Parameters
    ----------
    endog : array_like
        The endogenous variable.
    k_regimes : int
        The number of regimes.
    order : int
        The order of the autoregressive lag polynomial.
    trend : {'nc', 'c', 't', 'ct'}
        Whether or not to include a trend. To include an constant, time trend,
        or both, set `trend='c'`, `trend='t'`, or `trend='ct'`. For no trend,
        set `trend='nc'`. Default is a constant.
    exog : array_like, optional
        Array of exogenous regressors, shaped nobs x k.
    exog_tvtp : array_like, optional
        Array of exogenous or lagged variables to use in calculating
        time-varying transition probabilities (TVTP). TVTP is only used if this
        variable is provided. If an intercept is desired, a column of ones must
        be explicitly included in this array.
    switching_ar : bool or iterable, optional
        If a boolean, sets whether or not all autoregressive coefficients are
        switching across regimes. If an iterable, should be of length equal
        to `order`, where each element is a boolean describing whether the
        corresponding coefficient is switching. Default is True.
    switching_trend : bool or iterable, optional
        If a boolean, sets whether or not all trend coefficients are
        switching across regimes. If an iterable, should be of length equal
        to the number of trend variables, where each element is
        a boolean describing whether the corresponding coefficient is
        switching. Default is True.
    switching_exog : bool or iterable, optional
        If a boolean, sets whether or not all regression coefficients are
        switching across regimes. If an iterable, should be of length equal
        to the number of exogenous variables, where each element is
        a boolean describing whether the corresponding coefficient is
        switching. Default is True.
    switching_variance : bool, optional
        Whether or not there is regime-specific heteroskedasticity, i.e.
        whether or not the error term has a switching variance. Default is
        False.

    Notes
    -----
    This model is new and API stability is not guaranteed, although changes
    will be made in a backwards compatible way if possible.

    The model can be written as:

    .. math::

        y_t = a_{S_t} + x_t' \beta_{S_t} + \phi_{1, S_t}
        (y_{t-1} - a_{S_{t-1}} - x_{t-1}' \beta_{S_{t-1}}) + \dots +
        \phi_{p, S_t} (y_{t-p} - a_{S_{t-p}} - x_{t-p}' \beta_{S_{t-p}}) +
        \varepsilon_t \\
        \varepsilon_t \sim N(0, \sigma_{S_t}^2)

    i.e. the model is an autoregression with where the autoregressive
    coefficients, the mean of the process (possibly including trend or
    regression effects) and the variance of the error term may be switching
    across regimes.

    The `trend` is accommodated by prepending columns to the `exog` array. Thus
    if `trend='c'`, the passed `exog` array should not already have a column of
    ones.

    References
    ----------
    Kim, Chang-Jin, and Charles R. Nelson. 1999.
    "State-Space Models with Regime Switching:
    Classical and Gibbs-Sampling Approaches with Applications".
    MIT Press Books. The MIT Press.
    """

    def __init__(self, endog, k_regimes, order, trend='c', exog=None,
                 exog_tvtp=None, switching_ar=True, switching_trend=True,
                 switching_exog=False, switching_variance=False,
                 dates=None, freq=None, missing='none'):

        # Properties
        self.switching_ar = switching_ar

        # Switching options
        if self.switching_ar is True or self.switching_ar is False:
            self.switching_ar = [self.switching_ar] * order
        elif not len(self.switching_ar) == order:
            raise ValueError('Invalid iterable passed to `switching_ar`.')

        # Initialize the base model
        super(MarkovAutoregression, self).__init__(
            endog, k_regimes, trend=trend, exog=exog, order=order,
            exog_tvtp=exog_tvtp, switching_trend=switching_trend,
            switching_exog=switching_exog,
            switching_variance=switching_variance, dates=dates, freq=freq,
            missing=missing)

        # Sanity checks
        if self.nobs <= self.order:
            raise ValueError('Must have more observations than the order of'
                             ' the autoregression.')

        # Autoregressive exog
        self.exog_ar = lagmat(endog, self.order)[self.order:]

        # Reshape other datasets
        self.nobs -= self.order
        self.orig_endog = self.endog
        self.endog = self.endog[self.order:]
        if self._k_exog > 0:
            self.orig_exog = self.exog
            self.exog = self.exog[self.order:]

        # Reset the ModelData datasets
        self.data.endog, self.data.exog = (
            self.data._convert_endog_exog(self.endog, self.exog))

        # Reset indexes, if provided
        if self.data.row_labels is not None:
            self.data._cache['row_labels'] = (
                self.data.row_labels[self.order:])
        if self._index is not None:
            if self._index_generated:
                self._index = self._index[:-self.order]
            else:
                self._index = self._index[self.order:]

        # Parameters
        self.parameters['autoregressive'] = self.switching_ar

        # Cache an array for holding slices
        self._predict_slices = [slice(None, None, None)] * (self.order + 1)

[docs] def predict_conditional(self, params): """ In-sample prediction, conditional on the current and previous regime Parameters ---------- params : array_like Array of parameters at which to create predictions. Returns ------- predict : array_like Array of predictions conditional on current, and possibly past, regimes """ params = np.array(params, ndmin=1) # Prediction is based on: # y_t = x_t beta^{(S_t)} + # \phi_1^{(S_t)} (y_{t-1} - x_{t-1} beta^{(S_t-1)}) + ... # \phi_p^{(S_t)} (y_{t-p} - x_{t-p} beta^{(S_t-p)}) + eps_t if self._k_exog > 0: xb = [] for i in range(self.k_regimes): coeffs = params[self.parameters[i, 'exog']] xb.append(np.dot(self.orig_exog, coeffs)) predict = np.zeros( (self.k_regimes,) * (self.order + 1) + (self.nobs,), dtype=np.promote_types(np.float64, params.dtype)) # Iterate over S_{t} = i for i in range(self.k_regimes): ar_coeffs = params[self.parameters[i, 'autoregressive']] # y_t - x_t beta^{(S_t)} ix = self._predict_slices[:] ix[0] = i ix = tuple(ix) if self._k_exog > 0: predict[ix] += xb[i][self.order:] # Iterate over j = 2, .., p for j in range(1, self.order + 1): for k in range(self.k_regimes): # This gets a specific time-period / regime slice: # S_{t} = i, S_{t-j} = k, across all other time-period / # regime slices. ix = self._predict_slices[:] ix[0] = i ix[j] = k ix = tuple(ix) start = self.order - j end = -j if self._k_exog > 0: predict[ix] += ar_coeffs[j-1] * ( self.orig_endog[start:end] - xb[k][start:end]) else: predict[ix] += ar_coeffs[j-1] * ( self.orig_endog[start:end]) return predict
def _resid(self, params): return self.endog - self.predict_conditional(params) def _conditional_loglikelihoods(self, params): """ Compute loglikelihoods conditional on the current period's regime and the last `self.order` regimes. """ # Get the residuals resid = self._resid(params) # Compute the conditional likelihoods variance = params[self.parameters['variance']].squeeze() if self.switching_variance: variance = np.reshape(variance, (self.k_regimes, 1, 1)) conditional_loglikelihoods = ( -0.5 * resid**2 / variance - 0.5 * np.log(2 * np.pi * variance)) return conditional_loglikelihoods @property def _res_classes(self): return {'fit': (MarkovAutoregressionResults, MarkovAutoregressionResultsWrapper)} def _em_iteration(self, params0): """ EM iteration """ # Inherited parameters result, params1 = markov_switching.MarkovSwitching._em_iteration( self, params0) tmp = np.sqrt(result.smoothed_marginal_probabilities) # Regression coefficients coeffs = None if self._k_exog > 0: coeffs = self._em_exog(result, self.endog, self.exog, self.parameters.switching['exog'], tmp) for i in range(self.k_regimes): params1[self.parameters[i, 'exog']] = coeffs[i] # Autoregressive if self.order > 0: if self._k_exog > 0: ar_coeffs, variance = self._em_autoregressive( result, coeffs) else: ar_coeffs = self._em_exog( result, self.endog, self.exog_ar, self.parameters.switching['autoregressive']) variance = self._em_variance( result, self.endog, self.exog_ar, ar_coeffs, tmp) for i in range(self.k_regimes): params1[self.parameters[i, 'autoregressive']] = ar_coeffs[i] params1[self.parameters['variance']] = variance return result, params1 def _em_autoregressive(self, result, betas, tmp=None): """ EM step for autoregressive coefficients and variances """ if tmp is None: tmp = np.sqrt(result.smoothed_marginal_probabilities) resid = np.zeros((self.k_regimes, self.nobs + self.order)) resid[:] = self.orig_endog if self._k_exog > 0: for i in range(self.k_regimes): resid[i] -= np.dot(self.orig_exog, betas[i]) # The difference between this and `_em_exog` is that here we have a # different endog and exog for each regime coeffs = np.zeros((self.k_regimes,) + (self.order,)) variance = np.zeros((self.k_regimes,)) exog = np.zeros((self.nobs, self.order)) for i in range(self.k_regimes): endog = resid[i, self.order:] exog = lagmat(resid[i], self.order)[self.order:] tmp_endog = tmp[i] * endog tmp_exog = tmp[i][:, None] * exog coeffs[i] = np.dot(np.linalg.pinv(tmp_exog), tmp_endog) if self.switching_variance: tmp_resid = endog - np.dot(exog, coeffs[i]) variance[i] = (np.sum( tmp_resid**2 * result.smoothed_marginal_probabilities[i]) / np.sum(result.smoothed_marginal_probabilities[i])) else: tmp_resid = tmp_endog - np.dot(tmp_exog, coeffs[i]) variance[i] = np.sum(tmp_resid**2) # Variances if not self.switching_variance: variance = variance.sum() / self.nobs return coeffs, variance @property def start_params(self): """ (array) Starting parameters for maximum likelihood estimation. """ # Inherited parameters params = markov_switching.MarkovSwitching.start_params.fget(self) # OLS for starting parameters endog = self.endog.copy() if self._k_exog > 0 and self.order > 0: exog = np.c_[self.exog, self.exog_ar] elif self._k_exog > 0: exog = self.exog elif self.order > 0: exog = self.exog_ar if self._k_exog > 0 or self.order > 0: beta = np.dot(np.linalg.pinv(exog), endog) variance = np.var(endog - np.dot(exog, beta)) else: variance = np.var(endog) # Regression coefficients if self._k_exog > 0: if np.any(self.switching_coeffs): for i in range(self.k_regimes): params[self.parameters[i, 'exog']] = ( beta[:self._k_exog] * (i / self.k_regimes)) else: params[self.parameters['exog']] = beta[:self._k_exog] # Autoregressive if self.order > 0: if np.any(self.switching_ar): for i in range(self.k_regimes): params[self.parameters[i, 'autoregressive']] = ( beta[self._k_exog:] * (i / self.k_regimes)) else: params[self.parameters['autoregressive']] = beta[self._k_exog:] # Variance if self.switching_variance: params[self.parameters['variance']] = ( np.linspace(variance / 10., variance, num=self.k_regimes)) else: params[self.parameters['variance']] = variance return params @property def param_names(self): """ (list of str) List of human readable parameter names (for parameters actually included in the model). """ # Inherited parameters param_names = np.array( markov_regression.MarkovRegression.param_names.fget(self), dtype=object) # Autoregressive if np.any(self.switching_ar): for i in range(self.k_regimes): param_names[self.parameters[i, 'autoregressive']] = [ 'ar.L%d[%d]' % (j+1, i) for j in range(self.order)] else: param_names[self.parameters['autoregressive']] = [ 'ar.L%d' % (j+1) for j in range(self.order)] return param_names.tolist()
[docs] def transform_params(self, unconstrained): """ Transform unconstrained parameters used by the optimizer to constrained parameters used in likelihood evaluation Parameters ---------- unconstrained : array_like Array of unconstrained parameters used by the optimizer, to be transformed. Returns ------- constrained : array_like Array of constrained parameters which may be used in likelihood evaluation. """ # Inherited parameters constrained = super(MarkovAutoregression, self).transform_params( unconstrained) # Autoregressive # TODO may provide unexpected results when some coefficients are not # switching for i in range(self.k_regimes): s = self.parameters[i, 'autoregressive'] constrained[s] = constrain_stationary_univariate( unconstrained[s]) return constrained
[docs] def untransform_params(self, constrained): """ Transform constrained parameters used in likelihood evaluation to unconstrained parameters used by the optimizer Parameters ---------- constrained : array_like Array of constrained parameters used in likelihood evaluation, to be transformed. Returns ------- unconstrained : array_like Array of unconstrained parameters used by the optimizer. """ # Inherited parameters unconstrained = super(MarkovAutoregression, self).untransform_params( constrained) # Autoregressive # TODO may provide unexpected results when some coefficients are not # switching for i in range(self.k_regimes): s = self.parameters[i, 'autoregressive'] unconstrained[s] = unconstrain_stationary_univariate( constrained[s]) return unconstrained
class MarkovAutoregressionResults(markov_regression.MarkovRegressionResults): r""" Class to hold results from fitting a Markov switching autoregression model Parameters ---------- model : MarkovAutoregression instance The fitted model instance params : ndarray Fitted parameters filter_results : HamiltonFilterResults or KimSmootherResults instance The underlying filter and, optionally, smoother output cov_type : str The type of covariance matrix estimator to use. Can be one of 'approx', 'opg', 'robust', or 'none'. Attributes ---------- model : Model instance A reference to the model that was fit. filter_results : HamiltonFilterResults or KimSmootherResults instance The underlying filter and, optionally, smoother output nobs : float The number of observations used to fit the model. params : ndarray The parameters of the model. scale : float This is currently set to 1.0 and not used by the model or its results. """ pass class MarkovAutoregressionResultsWrapper( markov_regression.MarkovRegressionResultsWrapper): pass wrap.populate_wrapper(MarkovAutoregressionResultsWrapper, # noqa:E305 MarkovAutoregressionResults)