Source code for statsmodels.duration.survfunc

import numpy as np
import pandas as pd
from scipy.stats.distributions import chi2, norm
from statsmodels.graphics import utils


def _calc_survfunc_right(time, status, weights=None, entry=None, compress=True,
                         retall=True):
    """
    Calculate the survival function and its standard error for a single
    group.
    """

    # Convert the unique times to ranks (0, 1, 2, ...)
    if entry is None:
        utime, rtime = np.unique(time, return_inverse=True)
    else:
        tx = np.concatenate((time, entry))
        utime, rtime = np.unique(tx, return_inverse=True)
        rtime = rtime[0:len(time)]

    # Number of deaths at each unique time.
    ml = len(utime)
    if weights is None:
        d = np.bincount(rtime, weights=status, minlength=ml)
    else:
        d = np.bincount(rtime, weights=status*weights, minlength=ml)

    # Size of risk set just prior to each event time.
    if weights is None:
        n = np.bincount(rtime, minlength=ml)
    else:
        n = np.bincount(rtime, weights=weights, minlength=ml)
    if entry is not None:
        n = np.cumsum(n) - n
        rentry = np.searchsorted(utime, entry, side='left')
        if weights is None:
            n0 = np.bincount(rentry, minlength=ml)
        else:
            n0 = np.bincount(rentry, weights=weights, minlength=ml)
        n0 = np.cumsum(n0) - n0
        n = n0 - n
    else:
        n = np.cumsum(n[::-1])[::-1]

    # Only retain times where an event occured.
    if compress:
        ii = np.flatnonzero(d > 0)
        d = d[ii]
        n = n[ii]
        utime = utime[ii]

    # The survival function probabilities.
    sp = 1 - d / n.astype(np.float64)
    sp = np.log(sp)
    sp = np.cumsum(sp)
    sp = np.exp(sp)

    if not retall:
        return sp, utime, rtime, n, d

    # Standard errors
    if weights is None:
        # Greenwood's formula
        se = d / (n * (n - d)).astype(np.float64)
        se = np.cumsum(se)
        se = np.sqrt(se)
        se *= sp
    else:
        # Tsiatis' (1981) formula
        se = d / (n * n).astype(np.float64)
        se = np.cumsum(se)
        se = np.sqrt(se)

    return sp, se, utime, rtime, n, d


def _calc_incidence_right(time, status, weights=None):
    """
    Calculate the cumulative incidence function and its standard error.
    """

    # Calculate the all-cause survival function.
    status0 = (status >= 1).astype(np.float64)
    sp, utime, rtime, n, d = _calc_survfunc_right(time, status0, weights,
                                                  compress=False, retall=False)

    ngrp = status.max()

    # Number of cause-specific deaths at each unique time.
    d = []
    for k in range(ngrp):
        status0 = (status == k + 1).astype(np.float64)
        if weights is None:
            d0 = np.bincount(rtime, weights=status0, minlength=len(utime))
        else:
            d0 = np.bincount(rtime, weights=status0*weights,
                             minlength=len(utime))
        d.append(d0)

    # The cumulative incidence function probabilities.
    ip = []
    sp0 = np.r_[1, sp[:-1]] / n
    for k in range(ngrp):
        ip0 = np.cumsum(sp0 * d[k])
        ip.append(ip0)

    # The standard error of the cumulative incidence function.
    if weights is not None:
        return ip, None, utime
    se = []
    da = sum(d)
    for k in range(ngrp):

        ra = da / (n * (n - da))
        v = ip[k]**2 * np.cumsum(ra)
        v -= 2 * ip[k] * np.cumsum(ip[k] * ra)
        v += np.cumsum(ip[k]**2 * ra)

        ra = (n - d[k]) * d[k] / n
        v += np.cumsum(sp0**2 * ra)

        ra = sp0 * d[k] / n
        v -= 2 * ip[k] * np.cumsum(ra)
        v += 2 * np.cumsum(ip[k] * ra)

        se.append(np.sqrt(v))

    return ip, se, utime


def _checkargs(time, status, entry, freq_weights):

    if len(time) != len(status):
        raise ValueError("time and status must have the same length")

    if entry is not None and (len(entry) != len(time)):
        msg = "entry times and event times must have the same length"
        raise ValueError(msg)

    if entry is not None and np.any(entry >= time):
        msg = "Entry times must not occur on or after event times"
        raise ValueError(msg)

    if freq_weights is not None and (len(freq_weights) != len(time)):
        raise ValueError("weights, time and status must have the same length")


class CumIncidenceRight(object):
    """
    Estimation and inference for a cumulative incidence function.

    If J = 1, 2, ... indicates the event type, the cumulative
    incidence function for cause j is:

    I(t, j) = P(T <= t and J=j)

    Only right censoring is supported.  If frequency weights are provided,
    the point estimate is returned without a standard error.

    Parameters
    ----------
    time : array-like
        An array of times (censoring times or event times)
    status : array-like
        If status >= 1 indicates which event occured at time t.  If
        status = 0, the subject was censored at time t.
    title : string
        Optional title used for plots and summary output.
    freq_weights : array-like
        Optional frequency weights

    Attributes
    ----------
    times : array-like
        The distinct times at which the incidence rates are estimated
    cinc : list of arrays
        cinc[k-1] contains the estimated cumulative incidence rates
        for outcome k=1,2,...
    cinc_se : list of arrays
        The standard errors for the values in `cinc`.

    References
    ----------
    The Stata stcompet procedure:
        http://www.stata-journal.com/sjpdf.html?articlenum=st0059

    Dinse, G. E. and M. G. Larson. 1986. A note on semi-Markov models
    for partially censored data. Biometrika 73: 379-386.

    Marubini, E. and M. G. Valsecchi. 1995. Analysing Survival Data
    from Clinical Trials and Observational Studies. Chichester, UK:
    John Wiley & Sons.
    """

    def __init__(self, time, status, title=None, freq_weights=None):

        _checkargs(time, status, None, freq_weights)
        time = self.time = np.asarray(time)
        status = self.status = np.asarray(status)
        if freq_weights is not None:
            freq_weights = self.freq_weights = np.asarray(freq_weights)
        x = _calc_incidence_right(time, status, freq_weights)
        self.cinc = x[0]
        self.cinc_se = x[1]
        self.times = x[2]
        self.title = "" if not title else title


[docs]class SurvfuncRight(object): """ Estimation and inference for a survival function. The survival function S(t) = P(T > t) is the probability that an event time T is greater than t. This class currently only supports right censoring. Parameters ---------- time : array-like An array of times (censoring times or event times) status : array-like Status at the event time, status==1 is the 'event' (e.g. death, failure), meaning that the event occurs at the given value in `time`; status==0 indicates that censoring has occured, meaning that the event occurs after the given value in `time`. entry : array-like, optional An array of entry times for handling left truncation (the subject is not in the risk set on or before the entry time) title : string Optional title used for plots and summary output. freq_weights : array-like Optional frequency weights Attributes ---------- surv_prob : array-like The estimated value of the survivor function at each time point in `surv_times`. surv_prob_se : array-like The standard errors for the values in `surv_prob`. surv_times : array-like The points where the survival function changes. n_risk : array-like The number of subjects at risk just before each time value in `surv_times`. n_events : array-like The number of events (e.g. deaths) that occur at each point in `surv_times`. """ def __init__(self, time, status, entry=None, title=None, freq_weights=None): _checkargs(time, status, entry, freq_weights) time = self.time = np.asarray(time) status = self.status = np.asarray(status) if freq_weights is not None: freq_weights = self.freq_weights = np.asarray(freq_weights) if entry is not None: entry = self.entry = np.asarray(entry) x = _calc_survfunc_right(time, status, weights=freq_weights, entry=entry) self.surv_prob = x[0] self.surv_prob_se = x[1] self.surv_times = x[2] self.n_risk = x[4] self.n_events = x[5] self.title = "" if not title else title
[docs] def plot(self, ax=None): """ Plot the survival function. Examples -------- Change the line color: >>> import statsmodels.api as sm >>> data = sm.datasets.get_rdataset("flchain", "survival").data >>> df = data.loc[data.sex == "F", :] >>> sf = sm.SurvfuncRight(df["futime"], df["death"]) >>> fig = sf.plot() >>> ax = fig.get_axes()[0] >>> li = ax.get_lines() >>> li[0].set_color('purple') >>> li[1].set_color('purple') Don't show the censoring points: >>> fig = sf.plot() >>> ax = fig.get_axes()[0] >>> li = ax.get_lines() >>> li[1].set_visible(False) """ return plot_survfunc(self, ax)
[docs] def quantile(self, p): """ Estimated quantile of a survival distribution. Parameters ---------- p : float The probability point at which the quantile is determined. Returns the estimated quantile. """ # SAS uses a strict inequality here. ii = np.flatnonzero(self.surv_prob < 1 - p) if len(ii) == 0: return np.nan return self.surv_times[ii[0]]
[docs] def quantile_ci(self, p, alpha=0.05, method='cloglog'): """ Returns a confidence interval for a survival quantile. Parameters ---------- p : float The probability point for which a confidence interval is determined. alpha : float The confidence interval has nominal coverage probability 1 - `alpha`. method : string Function to use for g-transformation, must be ... Returns ------- lb : float The lower confidence limit. ub : float The upper confidence limit. Notes ----- The confidence interval is obtained by inverting Z-tests. The limits of the confidence interval will always be observed event times. References ---------- The method is based on the approach used in SAS, documented here: http://support.sas.com/documentation/cdl/en/statug/68162/HTML/default/viewer.htm#statug_lifetest_details03.htm """ tr = norm.ppf(1 - alpha / 2) method = method.lower() if method == "cloglog": g = lambda x: np.log(-np.log(x)) gprime = lambda x: -1 / (x * np.log(x)) elif method == "linear": g = lambda x: x gprime = lambda x: 1 elif method == "log": g = lambda x: np.log(x) gprime = lambda x: 1 / x elif method == "logit": g = lambda x: np.log(x / (1 - x)) gprime = lambda x: 1 / (x * (1 - x)) elif method == "asinsqrt": g = lambda x: np.arcsin(np.sqrt(x)) gprime = lambda x: 1 / (2 * np.sqrt(x) * np.sqrt(1 - x)) else: raise ValueError("unknown method") r = g(self.surv_prob) - g(1 - p) r /= (gprime(self.surv_prob) * self.surv_prob_se) ii = np.flatnonzero(np.abs(r) <= tr) if len(ii) == 0: return np.nan, np.nan lb = self.surv_times[ii[0]] if ii[-1] == len(self.surv_times) - 1: ub = np.inf else: ub = self.surv_times[ii[-1] + 1] return lb, ub
[docs] def summary(self): """ Return a summary of the estimated survival function. The summary is a datafram containing the unique event times, estimated survival function values, and related quantities. """ df = pd.DataFrame(index=self.surv_times) df.index.name = "Time" df["Surv prob"] = self.surv_prob df["Surv prob SE"] = self.surv_prob_se df["num at risk"] = self.n_risk df["num events"] = self.n_events return df
[docs] def simultaneous_cb(self, alpha=0.05, method="hw", transform="log"): """ Returns a simultaneous confidence band for the survival function. Parameters ---------- alpha : float `1 - alpha` is the desired simultaneous coverage probability for the confidence region. Currently alpha must be set to 0.05, giving 95% simultaneous intervals. method : string The method used to produce the simultaneous confidence band. Only the Hall-Wellner (hw) method is currently implemented. transform : string The used to produce the interval (note that the returned interval is on the survival probability scale regardless of which transform is used). Only `log` and `arcsin` are implemented. Returns ------- lcb : array-like The lower confidence limits corresponding to the points in `surv_times`. ucb : array-like The upper confidence limits corresponding to the points in `surv_times`. """ method = method.lower() if method != "hw": msg = "only the Hall-Wellner (hw) method is implemented" raise ValueError(msg) if alpha != 0.05: raise ValueError("alpha must be set to 0.05") transform = transform.lower() s2 = self.surv_prob_se**2 / self.surv_prob**2 nn = self.n_risk if transform == "log": denom = np.sqrt(nn) * np.log(self.surv_prob) theta = 1.3581 * (1 + nn * s2) / denom theta = np.exp(theta) lcb = self.surv_prob**(1/theta) ucb = self.surv_prob**theta elif transform == "arcsin": k = 1.3581 k *= (1 + nn * s2) / (2 * np.sqrt(nn)) k *= np.sqrt(self.surv_prob / (1 - self.surv_prob)) f = np.arcsin(np.sqrt(self.surv_prob)) v = np.clip(f - k, 0, np.inf) lcb = np.sin(v)**2 v = np.clip(f + k, -np.inf, np.pi/2) ucb = np.sin(v)**2 else: raise ValueError("Unknown transform") return lcb, ucb
def survdiff(time, status, group, weight_type=None, strata=None, entry=None, **kwargs): """ Test for the equality of two survival distributions. Parameters: ----------- time : array-like The event or censoring times. status : array-like The censoring status variable, status=1 indicates that the event occured, status=0 indicates that the observation was censored. group : array-like Indicators of the two groups weight_type : string The following weight types are implemented: None (default) : logrank test fh : Fleming-Harrington, weights by S^(fh_p), requires exponent fh_p to be provided as keyword argument; the weights are derived from S defined at the previous event time, and the first weight is always 1. gb : Gehan-Breslow, weights by the number at risk tw : Tarone-Ware, weights by the square root of the number at risk strata : array-like Optional stratum indicators for a stratified test entry : array-like Entry times to handle left truncation. The subject is not in the risk set on or before the entry time. Returns -------- chisq : The chi-square (1 degree of freedom) distributed test statistic value pvalue : The p-value for the chi^2 test """ # TODO: extend to handle more than two groups time = np.asarray(time) status = np.asarray(status) group = np.asarray(group) gr = np.unique(group) if len(gr) != 2: raise ValueError("logrank only supports two groups") if strata is None: obs, var = _survdiff(time, status, group, weight_type, gr, entry, **kwargs) else: strata = np.asarray(strata) stu = np.unique(strata) obs, var = 0., 0. for st in stu: # could be more efficient? ii = (strata == st) obs1, var1 = _survdiff(time[ii], status[ii], group[ii], weight_type, gr, entry, **kwargs) obs += obs1 var += var1 zstat = obs / np.sqrt(var) # The chi^2 test statistic and p-value. chisq = zstat**2 pvalue = 1 - chi2.cdf(chisq, 1) return chisq, pvalue def _survdiff(time, status, group, weight_type, gr, entry=None, **kwargs): # logrank test for one stratum # Get the unique times. if entry is None: utimes, rtimes = np.unique(time, return_inverse=True) else: utimes, rtimes = np.unique(np.concatenate((time, entry)), return_inverse=True) rtimes = rtimes[0:len(time)] # Split entry times by group if present (should use pandas groupby) tse = [(gr[0], None), (gr[1], None)] if entry is not None: for k in 0, 1: ii = (group == gr[k]) entry1 = entry[ii] tse[k] = (gr[k], entry1) # Event count and risk set size at each time point, per group and overall. # TODO: should use Pandas groupby nrisk, obsv = [], [] ml = len(utimes) for g, entry0 in tse: mk = (group == g) n = np.bincount(rtimes, weights=mk, minlength=ml) ob = np.bincount(rtimes, weights=status*mk, minlength=ml) obsv.append(ob) if entry is not None: n = np.cumsum(n) - n rentry = np.searchsorted(utimes, entry0, side='left') n0 = np.bincount(rentry, minlength=ml) n0 = np.cumsum(n0) - n0 nr = n0 - n else: nr = np.cumsum(n[::-1])[::-1] nrisk.append(nr) obs = sum(obsv) nrisk_tot = sum(nrisk) # The variance of event counts in the first group. r = nrisk[0] / nrisk_tot var = obs * r * (1 - r) * (nrisk_tot - obs) / (nrisk_tot - 1) # The expected number of events in the first group. exp1 = obs * r weights = None if weight_type is not None: weight_type = weight_type.lower() if weight_type == "gb": weights = nrisk_tot elif weight_type == "tw": weights = np.sqrt(nrisk_tot) elif weight_type == "fh": if "fh_p" not in kwargs: msg = "weight_type type 'fh' requires specification of fh_p" raise ValueError(msg) fh_p = kwargs["fh_p"] # Calculate the survivor function directly to avoid the # overhead of creating a SurvfuncRight object sp = 1 - obs / nrisk_tot.astype(np.float64) sp = np.log(sp) sp = np.cumsum(sp) sp = np.exp(sp) weights = sp**fh_p weights = np.roll(weights, 1) weights[0] = 1 else: raise ValueError("weight_type not implemented") # The Z-scale test statistic (compare to normal reference # distribution). ix = np.flatnonzero(nrisk_tot > 1) if weights is None: obs = np.sum(obsv[0][ix] - exp1[ix]) var = np.sum(var[ix]) else: obs = np.dot(weights[ix], obsv[0][ix] - exp1[ix]) var = np.dot(weights[ix]**2, var[ix]) return obs, var def plot_survfunc(survfuncs, ax=None): """ Plot one or more survivor functions. Parameters ---------- survfuncs : object or array-like A single SurvfuncRight object, or a list or SurvfuncRight objects that are plotted together. Returns ------- A figure instance on which the plot was drawn. Examples -------- Add a legend: >>> import statsmodels.api as sm >>> from statsmodels.duration.survfunc import plot_survfunc >>> data = sm.datasets.get_rdataset("flchain", "survival").data >>> df = data.loc[data.sex == "F", :] >>> sf0 = sm.SurvfuncRight(df["futime"], df["death"]) >>> sf1 = sm.SurvfuncRight(3.0 * df["futime"], df["death"]) >>> fig = plot_survfunc([sf0, sf1]) >>> ax = fig.get_axes()[0] >>> ax.set_position([0.1, 0.1, 0.64, 0.8]) >>> ha, lb = ax.get_legend_handles_labels() >>> leg = fig.legend((ha[0], ha[1]), (lb[0], lb[1]), 'center right') Change the line colors: >>> fig = plot_survfunc([sf0, sf1]) >>> ax = fig.get_axes()[0] >>> ax.set_position([0.1, 0.1, 0.64, 0.8]) >>> ha, lb = ax.get_legend_handles_labels() >>> ha[0].set_color('purple') >>> ha[1].set_color('orange') """ fig, ax = utils.create_mpl_ax(ax) # If we have only a single survival function to plot, put it into # a list. try: assert(type(survfuncs[0]) is SurvfuncRight) except: survfuncs = [survfuncs] for gx, sf in enumerate(survfuncs): # The estimated survival function does not include a point at # time 0, include it here for plotting. surv_times = np.concatenate(([0], sf.surv_times)) surv_prob = np.concatenate(([1], sf.surv_prob)) # If the final times are censoring times they are not included # in the survival function so we add them here mxt = max(sf.time) if mxt > surv_times[-1]: surv_times = np.concatenate((surv_times, [mxt])) surv_prob = np.concatenate((surv_prob, [surv_prob[-1]])) label = getattr(sf, "title", "Group %d" % (gx + 1)) li, = ax.step(surv_times, surv_prob, '-', label=label, lw=2, where='post') # Plot the censored points. ii = np.flatnonzero(np.logical_not(sf.status)) ti = sf.time[ii] jj = np.searchsorted(surv_times, ti) - 1 sp = surv_prob[jj] ax.plot(ti, sp, '+', ms=12, color=li.get_color(), label=label + " points") ax.set_ylim(0, 1.01) return fig