Source code for statsmodels.tsa.statespace.kalman_smoother

Kalman Smoother

Author: Chad Fulton
License: Simplified-BSD
from __future__ import division, absolute_import, print_function

from collections import namedtuple
import warnings

import numpy as np

from statsmodels.tsa.statespace.representation import OptionWrapper
from statsmodels.tsa.statespace.kalman_filter import (KalmanFilter,
from import ValueWarning

SMOOTHER_STATE = 0x01          # Durbin and Koopman (2012), Chapter 4.4.2
SMOOTHER_STATE_COV = 0x02      # ibid., Chapter 4.4.3
SMOOTHER_DISTURBANCE = 0x04    # ibid., Chapter 4.5
SMOOTHER_DISTURBANCE_COV = 0x08    # ibid., Chapter 4.5

_SmootherOutput = namedtuple('_SmootherOutput', (
    ' scaled_smoothed_estimator scaled_smoothed_estimator_cov'
    ' smoothing_error'
    ' smoothed_state smoothed_state_cov'
    ' smoothed_state_disturbance smoothed_state_disturbance_cov'
    ' smoothed_measurement_disturbance smoothed_measurement_disturbance_cov'

class _KalmanSmoother(object):

    def __init__(self, model, kfilter, smoother_output):
        # Save values
        self.model = model
        self.kfilter = kfilter
        self._kfilter = model._kalman_filter
        self.smoother_output = smoother_output

        # Create storage
        self.scaled_smoothed_estimator = None
        self.scaled_smoothed_estimator_cov = None
        self.smoothing_error = None
        self.smoothed_state = None
        self.smoothed_state_cov = None
        self.smoothed_state_disturbance = None
        self.smoothed_state_disturbance_cov = None
        self.smoothed_measurement_disturbance = None
        self.smoothed_measurement_disturbance_cov = None

        # Intermediate values
        self.tmp_L = np.zeros((model.k_states, model.k_states, model.nobs),

        if smoother_output & (SMOOTHER_STATE | SMOOTHER_DISTURBANCE):
            self.scaled_smoothed_estimator = (
                np.zeros((model.k_states, model.nobs+1), dtype=kfilter.dtype))
            self.smoothing_error = (
                np.zeros((model.k_endog, model.nobs), dtype=kfilter.dtype))
        if smoother_output & (SMOOTHER_STATE_COV | SMOOTHER_DISTURBANCE_COV):
            self.scaled_smoothed_estimator_cov = (
                np.zeros((model.k_states, model.k_states, model.nobs + 1),

        # State smoothing
        if smoother_output & SMOOTHER_STATE:
            self.smoothed_state = np.zeros((model.k_states, model.nobs),
        if smoother_output & SMOOTHER_STATE_COV:
            self.smoothed_state_cov = (
                np.zeros((model.k_states, model.k_states, model.nobs),

        # Disturbance smoothing
        if smoother_output & SMOOTHER_DISTURBANCE:
            self.smoothed_state_disturbance = (
                np.zeros((model.k_posdef, model.nobs), dtype=kfilter.dtype))
            self.smoothed_measurement_disturbance = (
                np.zeros((model.k_endog, model.nobs), dtype=kfilter.dtype))
        if smoother_output & SMOOTHER_DISTURBANCE_COV:
            self.smoothed_state_disturbance_cov = (
                np.zeros((model.k_posdef, model.k_posdef, model.nobs),
            self.smoothed_measurement_disturbance_cov = (
                np.zeros((model.k_endog, model.k_endog, model.nobs),

    def seek(self, t):
        if t >= self.model.nobs:
            raise IndexError("Observation index out of range")
        self.t = t

    def __iter__(self):
        return self

    def __call__(self):
        # Perform backwards smoothing iterations
        for i in range(self.model.nobs-1, -1, -1):

    def next(self):
        # next() is required for compatibility with Python2.7.
        return self.__next__()

    def __next__(self):
        # Check for valid iteration
        if not self.t >= 0:
            raise StopIteration

        # Get local copies of variables
        t = self.t
        kfilter = self.kfilter
        _kfilter = self._kfilter
        model = self.model
        smoother_output = self.smoother_output

        scaled_smoothed_estimator = self.scaled_smoothed_estimator
        scaled_smoothed_estimator_cov = self.scaled_smoothed_estimator_cov
        smoothing_error = self.smoothing_error
        smoothed_state = self.smoothed_state
        smoothed_state_cov = self.smoothed_state_cov
        smoothed_state_disturbance = self.smoothed_state_disturbance
        smoothed_state_disturbance_cov = self.smoothed_state_disturbance_cov
        smoothed_measurement_disturbance = (
        smoothed_measurement_disturbance_cov = (
        tmp_L = self.tmp_L

        # Seek the Cython Kalman filter to the right place, setup matrices, False)

        missing_entire_obs = (
            _kfilter.model.nmissing[t] == _kfilter.model.k_endog)
        missing_partial_obs = (
            not missing_entire_obs and _kfilter.model.nmissing[t] > 0)

        # Get the appropriate (possibly time-varying) indices
        design_t = 0 if[2] == 1 else t
        obs_cov_t = 0 if kfilter.obs_cov.shape[2] == 1 else t
        transition_t = 0 if kfilter.transition.shape[2] == 1 else t
        selection_t = 0 if kfilter.selection.shape[2] == 1 else t
        state_cov_t = 0 if kfilter.state_cov.shape[2] == 1 else t

        # Get endog dimension (can vary if there missing data)
        k_endog = _kfilter.k_endog

        # Get references to representation matrices and Kalman filter output
        transition = model.transition[:, :, transition_t]
        selection = model.selection[:, :, selection_t]
        state_cov = model.state_cov[:, :, state_cov_t]

        predicted_state = kfilter.predicted_state[:, t]
        predicted_state_cov = kfilter.predicted_state_cov[:, :, t]

        mask = ~kfilter.missing[:, t].astype(bool)
        if missing_partial_obs:
            design = np.array(
                _kfilter.selected_design[:k_endog*model.k_states], copy=True
            ).reshape(k_endog, model.k_states, order='F')
            obs_cov = np.array(
                _kfilter.selected_obs_cov[:k_endog**2], copy=True
            ).reshape(k_endog, k_endog)
            kalman_gain = kfilter.kalman_gain[:, mask, t]

            forecasts_error_cov = np.array(
                _kfilter.forecast_error_cov[:, :, t], copy=True
                ).ravel(order='F')[:k_endog**2].reshape(k_endog, k_endog)
            forecasts_error = np.array(
                _kfilter.forecast_error[:k_endog, t], copy=True)
            F_inv = np.linalg.inv(forecasts_error_cov)
            if missing_entire_obs:
                design = np.zeros([:-1])
                design =[:, :, design_t]
            obs_cov = model.obs_cov[:, :, obs_cov_t]
            kalman_gain = kfilter.kalman_gain[:, :, t]
            forecasts_error_cov = kfilter.forecasts_error_cov[:, :, t]
            forecasts_error = kfilter.forecasts_error[:, t]
            F_inv = np.linalg.inv(forecasts_error_cov)

        # Create a temporary matrix
        tmp_L[:, :, t] = transition -
        L = tmp_L[:, :, t]

        # Perform the recursion

        # Intermediate values
        if smoother_output & (SMOOTHER_STATE | SMOOTHER_DISTURBANCE):
            if missing_entire_obs:
                # smoothing_error is undefined here, keep it as zeros
                scaled_smoothed_estimator[:, t - 1] = (
                    transition.transpose().dot(scaled_smoothed_estimator[:, t])
                smoothing_error[:k_endog, t] = (
                        scaled_smoothed_estimator[:, t])
                scaled_smoothed_estimator[:, t - 1] = (
                    design.transpose().dot(smoothing_error[:k_endog, t]) +
                    transition.transpose().dot(scaled_smoothed_estimator[:, t])
        if smoother_output & (SMOOTHER_STATE_COV | SMOOTHER_DISTURBANCE_COV):
            if missing_entire_obs:
                scaled_smoothed_estimator_cov[:, :, t - 1] = (
                        scaled_smoothed_estimator_cov[:, :, t]
                scaled_smoothed_estimator_cov[:, :, t - 1] = (
                    design.transpose().dot(F_inv).dot(design) +
                        scaled_smoothed_estimator_cov[:, :, t]

        # State smoothing
        if smoother_output & SMOOTHER_STATE:
            smoothed_state[:, t] = (
                predicted_state +
      [:, t - 1])
        if smoother_output & SMOOTHER_STATE_COV:
            smoothed_state_cov[:, :, t] = (
                predicted_state_cov -
                    scaled_smoothed_estimator_cov[:, :, t - 1]

        # Disturbance smoothing
            QR =

        if smoother_output & SMOOTHER_DISTURBANCE:
            smoothed_state_disturbance[:, t] = (
      [:, t])
            # measurement disturbance is set to zero when all missing
            # (unconditional distribution)
            if not missing_entire_obs:
                smoothed_measurement_disturbance[mask, t] = (
          [:k_endog, t])

        if smoother_output & SMOOTHER_DISTURBANCE_COV:
            smoothed_state_disturbance_cov[:, :, t] = (
                state_cov -
                    scaled_smoothed_estimator_cov[:, :, t]

            if missing_entire_obs:
                smoothed_measurement_disturbance_cov[:, :, t] = obs_cov
                # For non-missing portion, calculate as usual
                ix = np.ix_(mask, mask, [t])
                smoothed_measurement_disturbance_cov[ix] = (
                    obs_cov -
                        F_inv + kalman_gain.transpose().dot(
                            scaled_smoothed_estimator_cov[:, :, t]
                )[:, :, np.newaxis]

                # For missing portion, use unconditional distribution
                ix = np.ix_(~mask, ~mask, [t])
                mod_ix = np.ix_(~mask, ~mask, [0])
                smoothed_measurement_disturbance_cov[ix] = np.copy(
                    model.obs_cov[:, :, obs_cov_t:obs_cov_t+1])[mod_ix]

        # Advance the smoother
        self.t -= 1

[docs]class KalmanSmoother(KalmanFilter): r""" State space representation of a time series process, with Kalman filter and smoother. Parameters ---------- k_endog : array_like or integer The observed time-series process :math:`y` if array like or the number of variables in the process if an integer. k_states : int The dimension of the unobserved state process. k_posdef : int, optional The dimension of a guaranteed positive definite covariance matrix describing the shocks in the measurement equation. Must be less than or equal to `k_states`. Default is `k_states`. results_class : class, optional Default results class to use to save filtering output. Default is `SmootherResults`. If specified, class must extend from `SmootherResults`. **kwargs Keyword arguments may be used to provide default values for state space matrices, for Kalman filtering options, or for Kalman smoothing options. See `Representation` for more details. """ smoother_outputs = [ 'smoother_state', 'smoother_state_cov', 'smoother_disturbance', 'smoother_disturbance_cov', 'smoother_all', ] smoother_state = OptionWrapper('smoother_output', SMOOTHER_STATE) smoother_state_cov = OptionWrapper('smoother_output', SMOOTHER_STATE_COV) smoother_disturbance = ( OptionWrapper('smoother_output', SMOOTHER_DISTURBANCE) ) smoother_disturbance_cov = ( OptionWrapper('smoother_output', SMOOTHER_DISTURBANCE_COV) ) smoother_all = OptionWrapper('smoother_output', SMOOTHER_ALL) # Default smoother options smoother_output = SMOOTHER_ALL def __init__(self, k_endog, k_states, k_posdef=None, results_class=None, **kwargs): # Set the default results class if results_class is None: results_class = SmootherResults super(KalmanSmoother, self).__init__( k_endog, k_states, k_posdef, results_class=results_class, **kwargs ) # Set the smoother output self.set_smoother_output(**kwargs)
[docs] def set_smoother_output(self, smoother_output=None, **kwargs): """ Set the smoother output The smoother can produce several types of results. The smoother output variable controls which are calculated and returned. Parameters ---------- smoother_output : integer, optional Bitmask value to set the smoother output to. See notes for details. **kwargs Keyword arguments may be used to influence the smoother output by setting individual boolean flags. See notes for details. Notes ----- The smoother output is defined by a collection of boolean flags, and is internally stored as a bitmask. The methods available are: SMOOTHER_STATE = 0x01 Calculate and return the smoothed states. SMOOTHER_STATE_COV = 0x02 Calculate and return the smoothed state covariance matrices. SMOOTHER_DISTURBANCE = 0x04 Calculate and return the smoothed state and observation disturbances. SMOOTHER_DISTURBANCE_COV = 0x08 Calculate and return the covariance matrices for the smoothed state and observation disturbances. SMOOTHER_ALL Calculate and return all results. If the bitmask is set directly via the `smoother_output` argument, then the full method must be provided. If keyword arguments are used to set individual boolean flags, then the lowercase of the method must be used as an argument name, and the value is the desired value of the boolean flag (True or False). Note that the smoother output may also be specified by directly modifying the class attributes which are defined similarly to the keyword arguments. The default smoother output is SMOOTHER_ALL. If performance is a concern, only those results which are needed should be specified as any results that are not specified will not be calculated. For example, if the smoother output is set to only include SMOOTHER_STATE, the smoother operates much more quickly than if all output is required. Examples -------- >>> import statsmodels.tsa.statespace.kalman_smoother as ks >>> mod = ks.KalmanSmoother(1,1) >>> mod.smoother_output 15 >>> mod.set_smoother_output(smoother_output=0) >>> mod.smoother_state = True >>> mod.smoother_output 1 >>> mod.smoother_state True """ if smoother_output is not None: self.smoother_output = smoother_output for name in KalmanSmoother.smoother_outputs: if name in kwargs: setattr(self, name, kwargs[name])
[docs] def smooth(self, smoother_output=None, results=None, run_filter=True, prefix=None, complex_step=False, **kwargs): """ Apply the Kalman smoother to the statespace model. Parameters ---------- smoother_output : int, optional Determines which Kalman smoother output calculate. Default is all (including state, disturbances, and all covariances). results : class or object, optional If a class, then that class is instantiated and returned with the result of both filtering and smoothing. If an object, then that object is updated with the smoothing data. If None, then a SmootherResults object is returned with both filtering and smoothing results. run_filter : bool, optional Whether or not to run the Kalman filter prior to smoothing. Default is True. prefix : string The prefix of the datatype. Usually only used internally. Returns ------- SmootherResults object """ if smoother_output is None: smoother_output = self.smoother_output # Set the class to be the default results class, if None provided if results is None: results = self.results_class # Initialize the filter and statespace object if necessary prefix, dtype, create_filter, create_statespace = ( self._initialize_filter(**kwargs) ) # Instantiate a new results object, if required new_results = False if isinstance(results, type): if not issubclass(results, SmootherResults): raise ValueError('Invalid results class provided.') results = results(self) new_results = True # Run the filter kfilter = self._kalman_filters[prefix] if not run_filter and (not kfilter.t == self.nobs or create_filter): run_filter = True warnings.warn('Despite `run_filter=False`, Kalman filtering was' ' performed because filtering was not complete.', ValueWarning) if run_filter: self._initialize_state(prefix=prefix, complex_step=complex_step) kfilter() # Update the results object with filtered output # Update the model features; unless we had to recreate the # statespace, only update the filter options if not new_results: results.update_representation(self) if run_filter: results.update_filter(kfilter) # Run the smoother and update the output smoother = _KalmanSmoother(self, results, smoother_output) smoother() output = _SmootherOutput( tmp_L=smoother.tmp_L, scaled_smoothed_estimator=smoother.scaled_smoothed_estimator, scaled_smoothed_estimator_cov=( smoother.scaled_smoothed_estimator_cov), smoothing_error=smoother.smoothing_error, smoothed_state=smoother.smoothed_state, smoothed_state_cov=smoother.smoothed_state_cov, smoothed_state_disturbance=smoother.smoothed_state_disturbance, smoothed_state_disturbance_cov=( smoother.smoothed_state_disturbance_cov), smoothed_measurement_disturbance=( smoother.smoothed_measurement_disturbance), smoothed_measurement_disturbance_cov=( smoother.smoothed_measurement_disturbance_cov), ) results.update_smoother(output) return results
[docs]class SmootherResults(FilterResults): """ Results from applying the Kalman smoother and/or filter to a state space model. Parameters ---------- model : Representation A Statespace representation Attributes ---------- nobs : int Number of observations. k_endog : int The dimension of the observation series. k_states : int The dimension of the unobserved state process. k_posdef : int The dimension of a guaranteed positive definite covariance matrix describing the shocks in the measurement equation. dtype : dtype Datatype of representation matrices prefix : str BLAS prefix of representation matrices shapes : dictionary of name:tuple A dictionary recording the shapes of each of the representation matrices as tuples. endog : array The observation vector. design : array The design matrix, :math:`Z`. obs_intercept : array The intercept for the observation equation, :math:`d`. obs_cov : array The covariance matrix for the observation equation :math:`H`. transition : array The transition matrix, :math:`T`. state_intercept : array The intercept for the transition equation, :math:`c`. selection : array The selection matrix, :math:`R`. state_cov : array The covariance matrix for the state equation :math:`Q`. missing : array of bool An array of the same size as `endog`, filled with boolean values that are True if the corresponding entry in `endog` is NaN and False otherwise. nmissing : array of int An array of size `nobs`, where the ith entry is the number (between 0 and k_endog) of NaNs in the ith row of the `endog` array. time_invariant : bool Whether or not the representation matrices are time-invariant initialization : str Kalman filter initialization method. initial_state : array_like The state vector used to initialize the Kalamn filter. initial_state_cov : array_like The state covariance matrix used to initialize the Kalamn filter. filter_method : int Bitmask representing the Kalman filtering method inversion_method : int Bitmask representing the method used to invert the forecast error covariance matrix. stability_method : int Bitmask representing the methods used to promote numerical stability in the Kalman filter recursions. conserve_memory : int Bitmask representing the selected memory conservation method. tolerance : float The tolerance at which the Kalman filter determines convergence to steady-state. loglikelihood_burn : int The number of initial periods during which the loglikelihood is not recorded. converged : bool Whether or not the Kalman filter converged. period_converged : int The time period in which the Kalman filter converged. filtered_state : array The filtered state vector at each time period. filtered_state_cov : array The filtered state covariance matrix at each time period. predicted_state : array The predicted state vector at each time period. predicted_state_cov : array The predicted state covariance matrix at each time period. kalman_gain : array The Kalman gain at each time period. forecasts : array The one-step-ahead forecasts of observations at each time period. forecasts_error : array The forecast errors at each time period. forecasts_error_cov : array The forecast error covariance matrices at each time period. loglikelihood : array The loglikelihood values at each time period. collapsed_forecasts : array If filtering using collapsed observations, stores the one-step-ahead forecasts of collapsed observations at each time period. collapsed_forecasts_error : array If filtering using collapsed observations, stores the one-step-ahead forecast errors of collapsed observations at each time period. collapsed_forecasts_error_cov : array If filtering using collapsed observations, stores the one-step-ahead forecast error covariance matrices of collapsed observations at each time period. standardized_forecast_error : array The standardized forecast errors smoother_output : int Bitmask representing the generated Kalman smoothing output scaled_smoothed_estimator : array The scaled smoothed estimator at each time period. scaled_smoothed_estimator_cov : array The scaled smoothed estimator covariance matrices at each time period. smoothing_error : array The smoothing error covariance matrices at each time period. smoothed_state : array The smoothed state at each time period. smoothed_state_cov : array The smoothed state covariance matrices at each time period. smoothed_measurement_disturbance : array The smoothed measurement at each time period. smoothed_state_disturbance : array The smoothed state at each time period. smoothed_measurement_disturbance_cov : array The smoothed measurement disturbance covariance matrices at each time period. smoothed_state_disturbance_cov : array The smoothed state disturbance covariance matrices at each time period. """ _smoother_attributes = [ 'smoother_output', 'scaled_smoothed_estimator', 'scaled_smoothed_estimator_cov', 'smoothing_error', 'smoothed_state', 'smoothed_state_cov', 'smoothed_measurement_disturbance', 'smoothed_state_disturbance', 'smoothed_measurement_disturbance_cov', 'smoothed_state_disturbance_cov' ] _smoother_options = KalmanSmoother.smoother_outputs _attributes = FilterResults._model_attributes + _smoother_attributes
[docs] def update_representation(self, model, only_options=False): """ Update the results to match a given model Parameters ---------- model : Representation The model object from which to take the updated values. only_options : boolean, optional If set to true, only the smoother and filter options are updated, and the state space representation is not updated. Default is False. Notes ----- This method is rarely required except for internal usage. """ super(SmootherResults, self).update_representation(model, only_options) # Save the options as boolean variables for name in self._smoother_options: setattr(self, name, getattr(model, name, None)) # Initialize holders for smoothed forecasts self._smoothed_forecasts = None self._smoothed_forecasts_error = None self._smoothed_forecasts_error_cov = None
[docs] def update_smoother(self, smoother): """ Update the smoother results Parameters ---------- smoother : KalmanSmoother The model object from which to take the updated values. Notes ----- This method is rarely required except for internal usage. """ # Copy the appropriate output attributes = [] # Since update_representation will already have been called, we can # use the boolean options smoother_* and know they match the smoother # itself if self.smoother_state or self.smoother_disturbance: attributes.append('scaled_smoothed_estimator') if self.smoother_state_cov or self.smoother_disturbance_cov: attributes.append('scaled_smoothed_estimator_cov') if self.smoother_state: attributes.append('smoothed_state') if self.smoother_state_cov: attributes.append('smoothed_state_cov') if self.smoother_disturbance: attributes += [ 'smoothing_error', 'smoothed_measurement_disturbance', 'smoothed_state_disturbance' ] if self.smoother_disturbance_cov: attributes += [ 'smoothed_measurement_disturbance_cov', 'smoothed_state_disturbance_cov' ] for name in self._smoother_attributes: if name == 'smoother_output': pass elif name in attributes: setattr( self, name, np.array(getattr(smoother, name, None), copy=True) ) else: setattr(self, name, None) # Adjustments # For r_t (and similarly for N_t), what was calculated was # r_T, ..., r_{-1}, and stored such that # scaled_smoothed_estimator[-1] == r_{-1}. We only want r_0, ..., r_T # so exclude the last element so that the time index is consistent # with the other returned output if 'scaled_smoothed_estimator' in attributes: self.scaled_smoothed_estimator = ( self.scaled_smoothed_estimator[:, :-1] ) if 'scaled_smoothed_estimator_cov' in attributes: self.scaled_smoothed_estimator_cov = ( self.scaled_smoothed_estimator_cov[:, :, :-1] ) # Clear the smoothed forecasts self._smoothed_forecasts = None self._smoothed_forecasts_error = None self._smoothed_forecasts_error_cov = None
def _get_smoothed_forecasts(self): if self._smoothed_forecasts is None: # Initialize empty arrays self._smoothed_forecasts = np.zeros(self.forecasts.shape, dtype=self.dtype) self._smoothed_forecasts_error = ( np.zeros(self.forecasts_error.shape, dtype=self.dtype) ) self._smoothed_forecasts_error_cov = ( np.zeros(self.forecasts_error_cov.shape, dtype=self.dtype) ) for t in range(self.nobs): design_t = 0 if[2] == 1 else t obs_cov_t = 0 if self.obs_cov.shape[2] == 1 else t obs_intercept_t = 0 if self.obs_intercept.shape[1] == 1 else t mask = ~self.missing[:, t].astype(bool) # We can recover forecasts self._smoothed_forecasts[:, t] =[:, :, design_t], self.smoothed_state[:, t] ) + self.obs_intercept[:, obs_intercept_t] if self.nmissing[t] > 0: self._smoothed_forecasts_error[:, t] = np.nan self._smoothed_forecasts_error[mask, t] = ( self.endog[mask, t] - self._smoothed_forecasts[mask, t] ) self._smoothed_forecasts_error_cov[:, :, t] =[:, :, design_t], self.smoothed_state_cov[:, :, t]),[:, :, design_t].T ) + self.obs_cov[:, :, obs_cov_t] return ( self._smoothed_forecasts, self._smoothed_forecasts_error, self._smoothed_forecasts_error_cov ) @property def smoothed_forecasts(self): return self._get_smoothed_forecasts()[0] @property def smoothed_forecasts_error(self): return self._get_smoothed_forecasts()[1] @property def smoothed_forecasts_error_cov(self): return self._get_smoothed_forecasts()[2]