{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Custom statespace models\n", "\n", "The true power of the state space model is to allow the creation and estimation of custom models. This notebook shows various statespace models that subclass sm.tsa.statespace.MLEModel.\n", "\n", "Remember the general state space model can be written in the following general way:\n", "\n", "\n", "\\begin{aligned}\n", "y_t & = Z_t \\alpha_{t} + d_t + \\varepsilon_t \\\\\n", "\\alpha_{t+1} & = T_t \\alpha_{t} + c_t + R_t \\eta_{t}\n", "\\end{aligned}\n", "\n", "\n", "You can check the details and the dimensions of the objects [in this link](https://www.statsmodels.org/stable/statespace.html#custom-state-space-models)\n", "\n", "Most models won't include all of these elements. For example, the design matrix $Z_t$ might not depend on time ($\\forall t \\;Z_t = Z$), or the model won't have an observation intercept $d_t$.\n", "\n", "We'll start with something relatively simple and then show how to extend it bit by bit to include more elements.\n", "\n", "+ Model 1: time-varying coefficients. One observation equation with two state equations\n", "+ Model 2: time-varying parameters with non identity transition matrix\n", "+ Model 3: multiple observation and multiple state equations\n", "+ Bonus: pymc3 for Bayesian estimation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "\n", "from collections import OrderedDict\n", "\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import pandas as pd\n", "import statsmodels.api as sm\n", "\n", "plt.rc(\"figure\", figsize=(16, 8))\n", "plt.rc(\"font\", size=15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model 1: time-varying coefficients\n", "\n", "\n", "\\begin{aligned}\n", "y_t & = d + x_t \\beta_{x,t} + w_t \\beta_{w,t} + \\varepsilon_t \\hspace{4em} \\varepsilon_t \\sim N(0, \\sigma_\\varepsilon^2)\\\\\n", "\\begin{bmatrix} \\beta_{x,t} \\\\ \\beta_{w,t} \\end{bmatrix} & = \\begin{bmatrix} \\beta_{x,t-1} \\\\ \\beta_{w,t-1} \\end{bmatrix} + \\begin{bmatrix} \\zeta_{x,t} \\\\ \\zeta_{w,t} \\end{bmatrix} \\hspace{3.7em} \\begin{bmatrix} \\zeta_{x,t} \\\\ \\zeta_{w,t} \\end{bmatrix} \\sim N \\left ( \\begin{bmatrix} 0 \\\\ 0 \\end{bmatrix}, \\begin{bmatrix} \\sigma_{\\beta, x}^2 & 0 \\\\ 0 & \\sigma_{\\beta, w}^2 \\end{bmatrix} \\right )\n", "\\end{aligned}\n", "\n", "\n", "The observed data is $y_t, x_t, w_t$. With $x_t, w_t$ being the exogenous variables. Notice that the design matrix is time-varying, so it will have three dimensions (k_endog x k_states x nobs)\n", "\n", "The states are $\\beta_{x,t}$ and $\\beta_{w,t}$. The state equation tells us these states evolve with a random walk. Thus, in this case the transition matrix is a 2 by 2 identity matrix.\n", "\n", "We'll first simulate the data, the construct a model and finally estimate it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def gen_data_for_model1():\n", " nobs = 1000\n", "\n", " rs = np.random.RandomState(seed=93572)\n", "\n", " d = 5\n", " var_y = 5\n", " var_coeff_x = 0.01\n", " var_coeff_w = 0.5\n", "\n", " x_t = rs.uniform(size=nobs)\n", " w_t = rs.uniform(size=nobs)\n", " eps = rs.normal(scale=var_y ** 0.5, size=nobs)\n", "\n", " beta_x = np.cumsum(rs.normal(size=nobs, scale=var_coeff_x ** 0.5))\n", " beta_w = np.cumsum(rs.normal(size=nobs, scale=var_coeff_w ** 0.5))\n", "\n", " y_t = d + beta_x * x_t + beta_w * w_t + eps\n", " return y_t, x_t, w_t, beta_x, beta_w\n", "\n", "\n", "y_t, x_t, w_t, beta_x, beta_w = gen_data_for_model1()\n", "_ = plt.plot(y_t)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class TVRegression(sm.tsa.statespace.MLEModel):\n", " def __init__(self, y_t, x_t, w_t):\n", " exog = np.c_[x_t, w_t] # shaped nobs x 2\n", "\n", " super(TVRegression, self).__init__(\n", " endog=y_t, exog=exog, k_states=2, initialization=\"diffuse\"\n", " )\n", "\n", " # Since the design matrix is time-varying, it must be\n", " # shaped k_endog x k_states x nobs\n", " # Notice that exog.T is shaped k_states x nobs, so we\n", " # just need to add a new first axis with shape 1\n", " self.ssm[\"design\"] = exog.T[np.newaxis, :, :] # shaped 1 x 2 x nobs\n", " self.ssm[\"selection\"] = np.eye(self.k_states)\n", " self.ssm[\"transition\"] = np.eye(self.k_states)\n", "\n", " # Which parameters need to be positive?\n", " self.positive_parameters = slice(1, 4)\n", "\n", " @property\n", " def param_names(self):\n", " return [\"intercept\", \"var.e\", \"var.x.coeff\", \"var.w.coeff\"]\n", "\n", " @property\n", " def start_params(self):\n", " \"\"\"\n", " Defines the starting values for the parameters\n", " The linear regression gives us reasonable starting values for the constant\n", " d and the variance of the epsilon error\n", " \"\"\"\n", " exog = sm.add_constant(self.exog)\n", " res = sm.OLS(self.endog, exog).fit()\n", " params = np.r_[res.params[0], res.scale, 0.001, 0.001]\n", " return params\n", "\n", " def transform_params(self, unconstrained):\n", " \"\"\"\n", " We constraint the last three parameters\n", " ('var.e', 'var.x.coeff', 'var.w.coeff') to be positive,\n", " because they are variances\n", " \"\"\"\n", " constrained = unconstrained.copy()\n", " constrained[self.positive_parameters] = (\n", " constrained[self.positive_parameters] ** 2\n", " )\n", " return constrained\n", "\n", " def untransform_params(self, constrained):\n", " \"\"\"\n", " Need to unstransform all the parameters you transformed\n", " in the transform_params function\n", " \"\"\"\n", " unconstrained = constrained.copy()\n", " unconstrained[self.positive_parameters] = (\n", " unconstrained[self.positive_parameters] ** 0.5\n", " )\n", " return unconstrained\n", "\n", " def update(self, params, **kwargs):\n", " params = super(TVRegression, self).update(params, **kwargs)\n", "\n", " self[\"obs_intercept\", 0, 0] = params[0]\n", " self[\"obs_cov\", 0, 0] = params[1]\n", " self[\"state_cov\"] = np.diag(params[2:4])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### And then estimate it with our custom model class" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mod = TVRegression(y_t, x_t, w_t)\n", "res = mod.fit()\n", "\n", "print(res.summary())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The values that generated the data were:\n", "\n", "+ intercept = 5\n", "+ var.e = 5\n", "+ var.x.coeff = 0.01\n", "+ var.w.coeff = 0.5\n", "\n", "\n", "As you can see, the estimation recovered the real parameters pretty well.\n", "\n", "We can also recover the estimated evolution of the underlying coefficients (or states in Kalman filter talk)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, axes = plt.subplots(2, figsize=(16, 8))\n", "\n", "ss = pd.DataFrame(res.smoothed_state.T, columns=[\"x\", \"w\"])\n", "\n", "axes[0].plot(beta_x, label=\"True\")\n", "axes[0].plot(ss[\"x\"], label=\"Smoothed estimate\")\n", "axes[0].set(title=\"Time-varying coefficient on x_t\")\n", "axes[0].legend()\n", "\n", "axes[1].plot(beta_w, label=\"True\")\n", "axes[1].plot(ss[\"w\"], label=\"Smoothed estimate\")\n", "axes[1].set(title=\"Time-varying coefficient on w_t\")\n", "axes[1].legend()\n", "\n", "fig.tight_layout();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model 2: time-varying parameters with non identity transition matrix\n", "\n", "This is a small extension from Model 1. Instead of having an identity transition matrix, we'll have one with two parameters ($\\rho_1, \\rho_2$) that we need to estimate. \n", "\n", "\n", "\n", "\\begin{aligned}\n", "y_t & = d + x_t \\beta_{x,t} + w_t \\beta_{w,t} + \\varepsilon_t \\hspace{4em} \\varepsilon_t \\sim N(0, \\sigma_\\varepsilon^2)\\\\\n", "\\begin{bmatrix} \\beta_{x,t} \\\\ \\beta_{w,t} \\end{bmatrix} & = \\begin{bmatrix} \\rho_1 & 0 \\\\ 0 & \\rho_2 \\end{bmatrix} \\begin{bmatrix} \\beta_{x,t-1} \\\\ \\beta_{w,t-1} \\end{bmatrix} + \\begin{bmatrix} \\zeta_{x,t} \\\\ \\zeta_{w,t} \\end{bmatrix} \\hspace{3.7em} \\begin{bmatrix} \\zeta_{x,t} \\\\ \\zeta_{w,t} \\end{bmatrix} \\sim N \\left ( \\begin{bmatrix} 0 \\\\ 0 \\end{bmatrix}, \\begin{bmatrix} \\sigma_{\\beta, x}^2 & 0 \\\\ 0 & \\sigma_{\\beta, w}^2 \\end{bmatrix} \\right )\n", "\\end{aligned}\n", "\n", "\n", "\n", "What should we modify in our previous class to make things work?\n", "+ Good news: not a lot!\n", "+ Bad news: we need to be careful about a few things" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1) Change the starting parameters function\n", "\n", "We need to add names for the new parameters $\\rho_1, \\rho_2$ and we need to start corresponding starting values.\n", "\n", "The param_names function goes from:\n", "\n", "python\n", "def param_names(self):\n", " return ['intercept', 'var.e', 'var.x.coeff', 'var.w.coeff']\n", "\n", "\n", "\n", "to \n", "\n", "python\n", "def param_names(self):\n", " return ['intercept', 'var.e', 'var.x.coeff', 'var.w.coeff',\n", " 'rho1', 'rho2']\n", "\n", "\n", "and we change the start_params function from \n", "\n", "python\n", "def start_params(self):\n", " exog = sm.add_constant(self.exog)\n", " res = sm.OLS(self.endog, exog).fit()\n", " params = np.r_[res.params[0], res.scale, 0.001, 0.001]\n", " return params\n", "\n", "\n", "to\n", "\n", "python\n", "def start_params(self):\n", " exog = sm.add_constant(self.exog)\n", " res = sm.OLS(self.endog, exog).fit()\n", " params = np.r_[res.params[0], res.scale, 0.001, 0.001, 0.8, 0.8]\n", " return params\n", "\n", "\n", "2) Change the update function\n", "\n", "It goes from\n", "\n", "python\n", "def update(self, params, **kwargs):\n", " params = super(TVRegression, self).update(params, **kwargs)\n", "\n", " self['obs_intercept', 0, 0] = params[0]\n", " self['obs_cov', 0, 0] = params[1]\n", " self['state_cov'] = np.diag(params[2:4])\n", "\n", "\n", "\n", "to \n", "\n", "python\n", "def update(self, params, **kwargs):\n", " params = super(TVRegression, self).update(params, **kwargs)\n", "\n", " self['obs_intercept', 0, 0] = params[0]\n", " self['obs_cov', 0, 0] = params[1]\n", " self['state_cov'] = np.diag(params[2:4])\n", " self['transition', 0, 0] = params[4]\n", " self['transition', 1, 1] = params[5]\n", "\n", "\n", "\n", "3) (optional) change transform_params and untransform_params\n", "\n", "This is not required, but you might wanna restrict $\\rho_1, \\rho_2$ to lie between -1 and 1.\n", "In that case, we first import two utility functions from statsmodels.\n", "\n", "\n", "python\n", "from statsmodels.tsa.statespace.tools import (\n", " constrain_stationary_univariate, unconstrain_stationary_univariate)\n", "\n", "\n", "constrain_stationary_univariate constraint the value to be within -1 and 1. \n", "unconstrain_stationary_univariate provides the inverse function.\n", "The transform and untransform parameters function would look like this\n", "(remember that $\\rho_1, \\rho_2$ are in the 4 and 5th index):\n", "\n", "python\n", "def transform_params(self, unconstrained):\n", " constrained = unconstrained.copy()\n", " constrained[self.positive_parameters] = constrained[self.positive_parameters]**2\n", " constrained[4] = constrain_stationary_univariate(constrained[4:5])\n", " constrained[5] = constrain_stationary_univariate(constrained[5:6])\n", " return constrained\n", "\n", "def untransform_params(self, constrained):\n", " unconstrained = constrained.copy()\n", " unconstrained[self.positive_parameters] = unconstrained[self.positive_parameters]**0.5\n", " unconstrained[4] = unconstrain_stationary_univariate(constrained[4:5])\n", " unconstrained[5] = unconstrain_stationary_univariate(constrained[5:6])\n", " return unconstrained\n", "\n", "\n", "I'll write the full class below (without the optional changes I have just discussed)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class TVRegressionExtended(sm.tsa.statespace.MLEModel):\n", " def __init__(self, y_t, x_t, w_t):\n", " exog = np.c_[x_t, w_t] # shaped nobs x 2\n", "\n", " super(TVRegressionExtended, self).__init__(\n", " endog=y_t, exog=exog, k_states=2, initialization=\"diffuse\"\n", " )\n", "\n", " # Since the design matrix is time-varying, it must be\n", " # shaped k_endog x k_states x nobs\n", " # Notice that exog.T is shaped k_states x nobs, so we\n", " # just need to add a new first axis with shape 1\n", " self.ssm[\"design\"] = exog.T[np.newaxis, :, :] # shaped 1 x 2 x nobs\n", " self.ssm[\"selection\"] = np.eye(self.k_states)\n", " self.ssm[\"transition\"] = np.eye(self.k_states)\n", "\n", " # Which parameters need to be positive?\n", " self.positive_parameters = slice(1, 4)\n", "\n", " @property\n", " def param_names(self):\n", " return [\"intercept\", \"var.e\", \"var.x.coeff\", \"var.w.coeff\", \"rho1\", \"rho2\"]\n", "\n", " @property\n", " def start_params(self):\n", " \"\"\"\n", " Defines the starting values for the parameters\n", " The linear regression gives us reasonable starting values for the constant\n", " d and the variance of the epsilon error\n", " \"\"\"\n", "\n", " exog = sm.add_constant(self.exog)\n", " res = sm.OLS(self.endog, exog).fit()\n", " params = np.r_[res.params[0], res.scale, 0.001, 0.001, 0.7, 0.8]\n", " return params\n", "\n", " def transform_params(self, unconstrained):\n", " \"\"\"\n", " We constraint the last three parameters\n", " ('var.e', 'var.x.coeff', 'var.w.coeff') to be positive,\n", " because they are variances\n", " \"\"\"\n", " constrained = unconstrained.copy()\n", " constrained[self.positive_parameters] = (\n", " constrained[self.positive_parameters] ** 2\n", " )\n", " return constrained\n", "\n", " def untransform_params(self, constrained):\n", " \"\"\"\n", " Need to unstransform all the parameters you transformed\n", " in the transform_params function\n", " \"\"\"\n", " unconstrained = constrained.copy()\n", " unconstrained[self.positive_parameters] = (\n", " unconstrained[self.positive_parameters] ** 0.5\n", " )\n", " return unconstrained\n", "\n", " def update(self, params, **kwargs):\n", " params = super(TVRegressionExtended, self).update(params, **kwargs)\n", "\n", " self[\"obs_intercept\", 0, 0] = params[0]\n", " self[\"obs_cov\", 0, 0] = params[1]\n", " self[\"state_cov\"] = np.diag(params[2:4])\n", " self[\"transition\", 0, 0] = params[4]\n", " self[\"transition\", 1, 1] = params[5]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To estimate, we'll use the same data as in model 1 and expect the $\\rho_1, \\rho_2$ to be near 1.\n", "\n", "The results look pretty good!\n", "Note that this estimation can be quite sensitive to the starting value of $\\rho_1, \\rho_2$. If you try lower values, you'll see it fails to converge." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mod = TVRegressionExtended(y_t, x_t, w_t)\n", "res = mod.fit(maxiter=2000) # it doesn't converge with 50 iters\n", "print(res.summary())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model 3: multiple observation and state equations\n", "\n", "We'll keep the time-varying parameters, but this time we'll also have two observation equations.\n", "\n", "### Observation equations\n", "\n", "$\\hat{i_t}, \\hat{M_t}, \\hat{s_t}$ are observed each period.\n", "\n", "The model for the observation equation has two equations:\n", "\n", "$$\\hat{i_t} = \\alpha_1 * \\hat{s_t} + \\varepsilon_1$$\n", "\n", "$$\\hat{M_t} = \\alpha_2 + \\varepsilon_2$$\n", "\n", "Following the [general notation from state space models](https://www.statsmodels.org/stable/statespace.html), the endogenous part of the observation equation is $y_t = (\\hat{i_t}, \\hat{M_t})$ and we only have one exogenous variable $\\hat{s_t}$\n", "\n", "\n", "### State equations\n", "\n", "\n", "$$\\alpha_{1, t+1} = \\delta_1 \\alpha_{1, t} + \\delta_2 \\alpha_{2, t} + W_1$$\n", "\n", "$$\\alpha_{2, t+1} = \\delta_3 \\alpha_{2, t} + W_2$$\n", "\n", "\n", "### Matrix notation for the state space model\n", "\n", "\n", "\\begin{aligned}\n", "\\begin{bmatrix} \\hat{i_t} \\\\ \\hat{M_t} \\end{bmatrix} &= \n", "\\begin{bmatrix} \\hat{s_t} & 0 \\\\ 0 & 1 \\end{bmatrix} \\begin{bmatrix} \\alpha_{1, t} \\\\ \\alpha_{2, t} \\end{bmatrix} + \\begin{bmatrix} \\varepsilon_{1, t} \\\\ \\varepsilon_{1, t} \\end{bmatrix} \\hspace{6.5em} \\varepsilon_t \\sim N \\left ( \\begin{bmatrix} 0 \\\\ 0 \\end{bmatrix}, \\begin{bmatrix} \\sigma_{\\varepsilon_1}^2 & 0 \\\\ 0 & \\sigma_{\\varepsilon_2}^2 \\end{bmatrix} \\right )\n", "\\\\\n", "\\begin{bmatrix} \\alpha_{1, t+1} \\\\ \\alpha_{2, t+1} \\end{bmatrix} & = \\begin{bmatrix} \\delta_1 & \\delta_1 \\\\ 0 & \\delta_3 \\end{bmatrix} \\begin{bmatrix} \\alpha_{1, t} \\\\ \\alpha_{2, t} \\end{bmatrix} + \\begin{bmatrix} W_1 \\\\ W_2 \\end{bmatrix} \\hspace{3.em} \\begin{bmatrix} W_1 \\\\ W_2 \\end{bmatrix} \\sim N \\left ( \\begin{bmatrix} 0 \\\\ 0 \\end{bmatrix}, \\begin{bmatrix} \\sigma_{W_1}^2 & 0 \\\\ 0 & \\sigma_{W_2}^2 \\end{bmatrix} \\right )\n", "\\end{aligned}\n", "\n", "\n", "I'll simulate some data, talk about what we need to modify and finally estimate the model to see if we're recovering something reasonable.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "true_values = {\n", " \"var_e1\": 0.01,\n", " \"var_e2\": 0.01,\n", " \"var_w1\": 0.01,\n", " \"var_w2\": 0.01,\n", " \"delta1\": 0.8,\n", " \"delta2\": 0.5,\n", " \"delta3\": 0.7,\n", "}\n", "\n", "\n", "def gen_data_for_model3():\n", " # Starting values\n", " alpha1_0 = 2.1\n", " alpha2_0 = 1.1\n", "\n", " t_max = 500\n", "\n", " def gen_i(alpha1, s):\n", " return alpha1 * s + np.sqrt(true_values[\"var_e1\"]) * np.random.randn()\n", "\n", " def gen_m_hat(alpha2):\n", " return 1 * alpha2 + np.sqrt(true_values[\"var_e2\"]) * np.random.randn()\n", "\n", " def gen_alpha1(alpha1, alpha2):\n", " w1 = np.sqrt(true_values[\"var_w1\"]) * np.random.randn()\n", " return true_values[\"delta1\"] * alpha1 + true_values[\"delta2\"] * alpha2 + w1\n", "\n", " def gen_alpha2(alpha2):\n", " w2 = np.sqrt(true_values[\"var_w2\"]) * np.random.randn()\n", " return true_values[\"delta3\"] * alpha2 + w2\n", "\n", " s_t = 0.3 + np.sqrt(1.4) * np.random.randn(t_max)\n", " i_hat = np.empty(t_max)\n", " m_hat = np.empty(t_max)\n", "\n", " current_alpha1 = alpha1_0\n", " current_alpha2 = alpha2_0\n", " for t in range(t_max):\n", " # Obs eqns\n", " i_hat[t] = gen_i(current_alpha1, s_t[t])\n", " m_hat[t] = gen_m_hat(current_alpha2)\n", "\n", " # state eqns\n", " new_alpha1 = gen_alpha1(current_alpha1, current_alpha2)\n", " new_alpha2 = gen_alpha2(current_alpha2)\n", "\n", " # Update states for next period\n", " current_alpha1 = new_alpha1\n", " current_alpha2 = new_alpha2\n", "\n", " return i_hat, m_hat, s_t\n", "\n", "\n", "i_hat, m_hat, s_t = gen_data_for_model3()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### What do we need to modify?\n", "\n", "Once again, we don't need to change much, but we need to be careful about the dimensions.\n", "\n", "#### 1) The __init__ function changes from\n", "\n", "\n", "python\n", "def __init__(self, y_t, x_t, w_t):\n", " exog = np.c_[x_t, w_t]\n", "\n", " super(TVRegressionExtended, self).__init__(\n", " endog=y_t, exog=exog, k_states=2,\n", " initialization='diffuse')\n", "\n", " self.ssm['design'] = exog.T[np.newaxis, :, :] # shaped 1 x 2 x nobs\n", " self.ssm['selection'] = np.eye(self.k_states)\n", " self.ssm['transition'] = np.eye(self.k_states)\n", "\n", "\n", "to\n", "\n", "\n", "python\n", "def __init__(self, i_t: np.array, s_t: np.array, m_t: np.array):\n", "\n", " exog = np.c_[s_t, np.repeat(1, len(s_t))] # exog.shape => (nobs, 2)\n", " \n", " super(MultipleYsModel, self).__init__(\n", " endog=np.c_[i_t, m_t], exog=exog, k_states=2,\n", " initialization='diffuse')\n", " \n", " self.ssm['design'] = np.zeros((self.k_endog, self.k_states, self.nobs))\n", " self.ssm['design', 0, 0, :] = s_t\n", " self.ssm['design', 1, 1, :] = 1\n", "\n", "\n", "Note that we did not have to specify k_endog anywhere. The initialization does this for us after checking the dimensions of the endog matrix.\n", "\n", "\n", "#### 2) The update() function \n", "\n", "changes from \n", "\n", "python\n", "def update(self, params, **kwargs):\n", " params = super(TVRegressionExtended, self).update(params, **kwargs)\n", "\n", " self['obs_intercept', 0, 0] = params[0]\n", " self['obs_cov', 0, 0] = params[1]\n", " \n", " self['state_cov'] = np.diag(params[2:4])\n", " self['transition', 0, 0] = params[4]\n", " self['transition', 1, 1] = params[5]\n", "\n", "\n", "\n", "to\n", "\n", "\n", "python\n", "def update(self, params, **kwargs):\n", " params = super(MultipleYsModel, self).update(params, **kwargs)\n", " \n", " \n", " #The following line is not needed (by default, this matrix is initialized by zeroes),\n", " #But I leave it here so the dimensions are clearer\n", " self['obs_intercept'] = np.repeat([np.array([0, 0])], self.nobs, axis=0).T\n", " self['obs_cov', 0, 0] = params[0]\n", " self['obs_cov', 1, 1] = params[1]\n", "\n", " self['state_cov'] = np.diag(params[2:4])\n", " #delta1, delta2, delta3\n", " self['transition', 0, 0] = params[4]\n", " self['transition', 0, 1] = params[5]\n", " self['transition', 1, 1] = params[6]\n", "\n", "\n", "The rest of the methods change in pretty obvious ways (need to add parameter names, make sure the indexes work, etc). The full code for the function is right below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "starting_values = {\n", " \"var_e1\": 0.2,\n", " \"var_e2\": 0.1,\n", " \"var_w1\": 0.15,\n", " \"var_w2\": 0.18,\n", " \"delta1\": 0.7,\n", " \"delta2\": 0.1,\n", " \"delta3\": 0.85,\n", "}\n", "\n", "\n", "class MultipleYsModel(sm.tsa.statespace.MLEModel):\n", " def __init__(self, i_t: np.array, s_t: np.array, m_t: np.array):\n", "\n", " exog = np.c_[s_t, np.repeat(1, len(s_t))] # exog.shape => (nobs, 2)\n", "\n", " super(MultipleYsModel, self).__init__(\n", " endog=np.c_[i_t, m_t], exog=exog, k_states=2, initialization=\"diffuse\"\n", " )\n", "\n", " self.ssm[\"design\"] = np.zeros((self.k_endog, self.k_states, self.nobs))\n", " self.ssm[\"design\", 0, 0, :] = s_t\n", " self.ssm[\"design\", 1, 1, :] = 1\n", "\n", " # These have ok shape. Placeholders since I'm changing them\n", " # in the update() function\n", " self.ssm[\"selection\"] = np.eye(self.k_states)\n", " self.ssm[\"transition\"] = np.eye(self.k_states)\n", "\n", " # Dictionary of positions to names\n", " self.position_dict = OrderedDict(\n", " var_e1=1, var_e2=2, var_w1=3, var_w2=4, delta1=5, delta2=6, delta3=7\n", " )\n", " self.initial_values = starting_values\n", " self.positive_parameters = slice(0, 4)\n", "\n", " @property\n", " def param_names(self):\n", " return list(self.position_dict.keys())\n", "\n", " @property\n", " def start_params(self):\n", " \"\"\"\n", " Initial values\n", " \"\"\"\n", " # (optional) Use scale for var_e1 and var_e2 starting values\n", " params = np.r_[\n", " self.initial_values[\"var_e1\"],\n", " self.initial_values[\"var_e2\"],\n", " self.initial_values[\"var_w1\"],\n", " self.initial_values[\"var_w2\"],\n", " self.initial_values[\"delta1\"],\n", " self.initial_values[\"delta2\"],\n", " self.initial_values[\"delta3\"],\n", " ]\n", " return params\n", "\n", " def transform_params(self, unconstrained):\n", " \"\"\"\n", " If you need to restrict parameters\n", " For example, variances should be > 0\n", " Parameters maybe have to be within -1 and 1\n", " \"\"\"\n", " constrained = unconstrained.copy()\n", " constrained[self.positive_parameters] = (\n", " constrained[self.positive_parameters] ** 2\n", " )\n", " return constrained\n", "\n", " def untransform_params(self, constrained):\n", " \"\"\"\n", " Need to reverse what you did in transform_params()\n", " \"\"\"\n", " unconstrained = constrained.copy()\n", " unconstrained[self.positive_parameters] = (\n", " unconstrained[self.positive_parameters] ** 0.5\n", " )\n", " return unconstrained\n", "\n", " def update(self, params, **kwargs):\n", " params = super(MultipleYsModel, self).update(params, **kwargs)\n", "\n", " # The following line is not needed (by default, this matrix is initialized by zeroes),\n", " # But I leave it here so the dimensions are clearer\n", " self[\"obs_intercept\"] = np.repeat([np.array([0, 0])], self.nobs, axis=0).T\n", "\n", " self[\"obs_cov\", 0, 0] = params[0]\n", " self[\"obs_cov\", 1, 1] = params[1]\n", "\n", " self[\"state_cov\"] = np.diag(params[2:4])\n", "\n", " # delta1, delta2, delta3\n", " self[\"transition\", 0, 0] = params[4]\n", " self[\"transition\", 0, 1] = params[5]\n", " self[\"transition\", 1, 1] = params[6]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mod = MultipleYsModel(i_hat, s_t, m_hat)\n", "res = mod.fit()\n", "\n", "print(res.summary())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bonus: pymc3 for fast Bayesian estimation\n", "\n", "In this section I'll show how you can take your custom state space model and easily plug it to pymc3 and estimate it with Bayesian methods. In particular, this example will show you an estimation with a version of Hamiltonian Monte Carlo called the No-U-Turn Sampler (NUTS).\n", "\n", "I'm basically copying the ideas contained [in this notebook](https://www.statsmodels.org/dev/examples/notebooks/generated/statespace_sarimax_pymc3.html), so make sure to check that for more details." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Extra requirements\n", "import pymc3 as pm\n", "import theano\n", "import theano.tensor as tt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We need to define some helper functions to connect theano to the likelihood function that is implied in our model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Loglike(tt.Op):\n", "\n", " itypes = [tt.dvector] # expects a vector of parameter values when called\n", " otypes = [tt.dscalar] # outputs a single scalar value (the log likelihood)\n", "\n", " def __init__(self, model):\n", " self.model = model\n", " self.score = Score(self.model)\n", "\n", " def perform(self, node, inputs, outputs):\n", " (theta,) = inputs # contains the vector of parameters\n", " llf = self.model.loglike(theta)\n", " outputs[0][0] = np.array(llf) # output the log-likelihood\n", "\n", " def grad(self, inputs, g):\n", " # the method that calculates the gradients - it actually returns the\n", " # vector-Jacobian product - g[0] is a vector of parameter values\n", " (theta,) = inputs # our parameters\n", " out = [g[0] * self.score(theta)]\n", " return out\n", "\n", "\n", "class Score(tt.Op):\n", " itypes = [tt.dvector]\n", " otypes = [tt.dvector]\n", "\n", " def __init__(self, model):\n", " self.model = model\n", "\n", " def perform(self, node, inputs, outputs):\n", " (theta,) = inputs\n", " outputs[0][0] = self.model.score(theta)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll simulate again the data we used for model 1.\n", "We'll also fit it again and save the results to compare them to the Bayesian posterior we get." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y_t, x_t, w_t, beta_x, beta_w = gen_data_for_model1()\n", "plt.plot(y_t)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mod = TVRegression(y_t, x_t, w_t)\n", "res_mle = mod.fit(disp=False)\n", "print(res_mle.summary())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Bayesian estimation\n", "\n", "We need to define a prior for each parameter and the number of draws and burn-in points" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set sampling params\n", "ndraws = 3000 # 3000 number of draws from the distribution\n", "nburn = 600 # 600 number of \"burn-in points\" (which will be discarded)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Construct an instance of the Theano wrapper defined above, which\n", "# will allow PyMC3 to compute the likelihood and Jacobian in a way\n", "# that it can make use of. Here we are using the same model instance\n", "# created earlier for MLE analysis (we could also create a new model\n", "# instance if we preferred)\n", "loglike = Loglike(mod)\n", "\n", "with pm.Model():\n", " # Priors\n", " intercept = pm.Uniform(\"intercept\", 1, 10)\n", " var_e = pm.InverseGamma(\"var.e\", 2.3, 0.5)\n", " var_x_coeff = pm.InverseGamma(\"var.x.coeff\", 2.3, 0.1)\n", " var_w_coeff = pm.InverseGamma(\"var.w.coeff\", 2.3, 0.1)\n", "\n", " # convert variables to tensor vectors\n", " theta = tt.as_tensor_variable([intercept, var_e, var_x_coeff, var_w_coeff])\n", "\n", " # use a DensityDist (use a lamdba function to \"call\" the Op)\n", " pm.DensityDist(\"likelihood\", loglike, observed=theta)\n", "\n", " # Draw samples\n", " trace = pm.sample(\n", " ndraws,\n", " tune=nburn,\n", " return_inferencedata=True,\n", " cores=1,\n", " compute_convergence_checks=False,\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### How does the posterior distribution compare with the MLE estimation?\n", "\n", "The clearly peak around the MLE estimate." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results_dict = {\n", " \"intercept\": res_mle.params[0],\n", " \"var.e\": res_mle.params[1],\n", " \"var.x.coeff\": res_mle.params[2],\n", " \"var.w.coeff\": res_mle.params[3],\n", "}\n", "plt.tight_layout()\n", "_ = pm.plot_trace(\n", " trace,\n", " lines=[(k, {}, [v]) for k, v in dict(results_dict).items()],\n", " combined=True,\n", " figsize=(12, 12),\n", ")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.5" } }, "nbformat": 4, "nbformat_minor": 4 }