# Source code for statsmodels.duration.survfunc

import numpy as np
import pandas as pd
from scipy.stats.distributions import chi2, norm
from statsmodels.graphics import utils

def _calc_survfunc_right(time, status, weights=None, entry=None, compress=True,
retall=True):
"""
Calculate the survival function and its standard error for a single
group.
"""

# Convert the unique times to ranks (0, 1, 2, ...)
if entry is None:
utime, rtime = np.unique(time, return_inverse=True)
else:
tx = np.concatenate((time, entry))
utime, rtime = np.unique(tx, return_inverse=True)
rtime = rtime[0:len(time)]

# Number of deaths at each unique time.
ml = len(utime)
if weights is None:
d = np.bincount(rtime, weights=status, minlength=ml)
else:
d = np.bincount(rtime, weights=status*weights, minlength=ml)

# Size of risk set just prior to each event time.
if weights is None:
n = np.bincount(rtime, minlength=ml)
else:
n = np.bincount(rtime, weights=weights, minlength=ml)
if entry is not None:
n = np.cumsum(n) - n
rentry = np.searchsorted(utime, entry, side='left')
if weights is None:
n0 = np.bincount(rentry, minlength=ml)
else:
n0 = np.bincount(rentry, weights=weights, minlength=ml)
n0 = np.cumsum(n0) - n0
n = n0 - n
else:
n = np.cumsum(n[::-1])[::-1]

# Only retain times where an event occured.
if compress:
ii = np.flatnonzero(d > 0)
d = d[ii]
n = n[ii]
utime = utime[ii]

# The survival function probabilities.
sp = 1 - d / n.astype(np.float64)
sp = np.log(sp)
sp = np.cumsum(sp)
sp = np.exp(sp)

if not retall:
return sp, utime, rtime, n, d

# Standard errors
if weights is None:
# Greenwood's formula
se = d / (n * (n - d)).astype(np.float64)
se = np.cumsum(se)
se = np.sqrt(se)
se *= sp
else:
# Tsiatis' (1981) formula
se = d / (n * n).astype(np.float64)
se = np.cumsum(se)
se = np.sqrt(se)

return sp, se, utime, rtime, n, d

def _calc_incidence_right(time, status, weights=None):
"""
Calculate the cumulative incidence function and its standard error.
"""

# Calculate the all-cause survival function.
status0 = (status >= 1).astype(np.float64)
sp, utime, rtime, n, d = _calc_survfunc_right(time, status0, weights,
compress=False, retall=False)

ngrp = status.max()

# Number of cause-specific deaths at each unique time.
d = []
for k in range(ngrp):
status0 = (status == k + 1).astype(np.float64)
if weights is None:
d0 = np.bincount(rtime, weights=status0, minlength=len(utime))
else:
d0 = np.bincount(rtime, weights=status0*weights,
minlength=len(utime))
d.append(d0)

# The cumulative incidence function probabilities.
ip = []
sp0 = np.r_[1, sp[:-1]] / n
for k in range(ngrp):
ip0 = np.cumsum(sp0 * d[k])
ip.append(ip0)

# The standard error of the cumulative incidence function.
if weights is not None:
return ip, None, utime
se = []
da = sum(d)
for k in range(ngrp):

ra = da / (n * (n - da))
v = ip[k]**2 * np.cumsum(ra)
v -= 2 * ip[k] * np.cumsum(ip[k] * ra)
v += np.cumsum(ip[k]**2 * ra)

ra = (n - d[k]) * d[k] / n
v += np.cumsum(sp0**2 * ra)

ra = sp0 * d[k] / n
v -= 2 * ip[k] * np.cumsum(ra)
v += 2 * np.cumsum(ip[k] * ra)

se.append(np.sqrt(v))

return ip, se, utime

def _checkargs(time, status, entry, freq_weights):

if len(time) != len(status):
raise ValueError("time and status must have the same length")

if entry is not None and (len(entry) != len(time)):
msg = "entry times and event times must have the same length"
raise ValueError(msg)

if entry is not None and np.any(entry >= time):
msg = "Entry times must not occur on or after event times"
raise ValueError(msg)

if freq_weights is not None and (len(freq_weights) != len(time)):
raise ValueError("weights, time and status must have the same length")

class CumIncidenceRight(object):
"""
Estimation and inference for a cumulative incidence function.

If J = 1, 2, ... indicates the event type, the cumulative
incidence function for cause j is:

I(t, j) = P(T <= t and J=j)

Only right censoring is supported.  If frequency weights are provided,
the point estimate is returned without a standard error.

Parameters
----------
time : array-like
An array of times (censoring times or event times)
status : array-like
If status >= 1 indicates which event occured at time t.  If
status = 0, the subject was censored at time t.
title : string
Optional title used for plots and summary output.
freq_weights : array-like
Optional frequency weights

Attributes
----------
times : array-like
The distinct times at which the incidence rates are estimated
cinc : list of arrays
cinc[k-1] contains the estimated cumulative incidence rates
for outcome k=1,2,...
cinc_se : list of arrays
The standard errors for the values in `cinc`.

References
----------
The Stata stcompet procedure:
http://www.stata-journal.com/sjpdf.html?articlenum=st0059

Dinse, G. E. and M. G. Larson. 1986. A note on semi-Markov models
for partially censored data. Biometrika 73: 379-386.

Marubini, E. and M. G. Valsecchi. 1995. Analysing Survival Data
from Clinical Trials and Observational Studies. Chichester, UK:
John Wiley & Sons.
"""

def __init__(self, time, status, title=None, freq_weights=None):

_checkargs(time, status, None, freq_weights)
time = self.time = np.asarray(time)
status = self.status = np.asarray(status)
if freq_weights is not None:
freq_weights = self.freq_weights = np.asarray(freq_weights)
x = _calc_incidence_right(time, status, freq_weights)
self.cinc = x[0]
self.cinc_se = x[1]
self.times = x[2]
self.title = "" if not title else title

[docs]class SurvfuncRight(object): """ Estimation and inference for a survival function. The survival function S(t) = P(T > t) is the probability that an event time T is greater than t. This class currently only supports right censoring. Parameters ---------- time : array-like An array of times (censoring times or event times) status : array-like Status at the event time, status==1 is the 'event' (e.g. death, failure), meaning that the event occurs at the given value in `time`; status==0 indicates that censoring has occured, meaning that the event occurs after the given value in `time`. entry : array-like, optional An array of entry times for handling left truncation (the subject is not in the risk set on or before the entry time) title : string Optional title used for plots and summary output. freq_weights : array-like Optional frequency weights Attributes ---------- surv_prob : array-like The estimated value of the survivor function at each time point in `surv_times`. surv_prob_se : array-like The standard errors for the values in `surv_prob`. surv_times : array-like The points where the survival function changes. n_risk : array-like The number of subjects at risk just before each time value in `surv_times`. n_events : array-like The number of events (e.g. deaths) that occur at each point in `surv_times`. """ def __init__(self, time, status, entry=None, title=None, freq_weights=None): _checkargs(time, status, entry, freq_weights) time = self.time = np.asarray(time) status = self.status = np.asarray(status) if freq_weights is not None: freq_weights = self.freq_weights = np.asarray(freq_weights) if entry is not None: entry = self.entry = np.asarray(entry) x = _calc_survfunc_right(time, status, weights=freq_weights, entry=entry) self.surv_prob = x[0] self.surv_prob_se = x[1] self.surv_times = x[2] self.n_risk = x[4] self.n_events = x[5] self.title = "" if not title else title
[docs] def plot(self, ax=None): """ Plot the survival function. Examples -------- Change the line color: >>> import statsmodels.api as sm >>> data = sm.datasets.get_rdataset("flchain", "survival").data >>> df = data.loc[data.sex == "F", :] >>> sf = sm.SurvfuncRight(df["futime"], df["death"]) >>> fig = sf.plot() >>> ax = fig.get_axes()[0] >>> li = ax.get_lines() >>> li[0].set_color('purple') >>> li[1].set_color('purple') Don't show the censoring points: >>> fig = sf.plot() >>> ax = fig.get_axes()[0] >>> li = ax.get_lines() >>> li[1].set_visible(False) """ return plot_survfunc(self, ax)
[docs] def quantile(self, p): """ Estimated quantile of a survival distribution. Parameters ---------- p : float The probability point at which the quantile is determined. Returns the estimated quantile. """ # SAS uses a strict inequality here. ii = np.flatnonzero(self.surv_prob < 1 - p) if len(ii) == 0: return np.nan return self.surv_times[ii[0]]
[docs] def quantile_ci(self, p, alpha=0.05, method='cloglog'): """ Returns a confidence interval for a survival quantile. Parameters ---------- p : float The probability point for which a confidence interval is determined. alpha : float The confidence interval has nominal coverage probability 1 - `alpha`. method : string Function to use for g-transformation, must be ... Returns ------- lb : float The lower confidence limit. ub : float The upper confidence limit. Notes ----- The confidence interval is obtained by inverting Z-tests. The limits of the confidence interval will always be observed event times. References ---------- The method is based on the approach used in SAS, documented here: http://support.sas.com/documentation/cdl/en/statug/68162/HTML/default/viewer.htm#statug_lifetest_details03.htm """ tr = norm.ppf(1 - alpha / 2) method = method.lower() if method == "cloglog": g = lambda x: np.log(-np.log(x)) gprime = lambda x: -1 / (x * np.log(x)) elif method == "linear": g = lambda x: x gprime = lambda x: 1 elif method == "log": g = lambda x: np.log(x) gprime = lambda x: 1 / x elif method == "logit": g = lambda x: np.log(x / (1 - x)) gprime = lambda x: 1 / (x * (1 - x)) elif method == "asinsqrt": g = lambda x: np.arcsin(np.sqrt(x)) gprime = lambda x: 1 / (2 * np.sqrt(x) * np.sqrt(1 - x)) else: raise ValueError("unknown method") r = g(self.surv_prob) - g(1 - p) r /= (gprime(self.surv_prob) * self.surv_prob_se) ii = np.flatnonzero(np.abs(r) <= tr) if len(ii) == 0: return np.nan, np.nan lb = self.surv_times[ii[0]] if ii[-1] == len(self.surv_times) - 1: ub = np.inf else: ub = self.surv_times[ii[-1] + 1] return lb, ub
[docs] def summary(self): """ Return a summary of the estimated survival function. The summary is a datafram containing the unique event times, estimated survival function values, and related quantities. """ df = pd.DataFrame(index=self.surv_times) df.index.name = "Time" df["Surv prob"] = self.surv_prob df["Surv prob SE"] = self.surv_prob_se df["num at risk"] = self.n_risk df["num events"] = self.n_events return df
[docs] def simultaneous_cb(self, alpha=0.05, method="hw", transform="log"): """ Returns a simultaneous confidence band for the survival function. Parameters ---------- alpha : float `1 - alpha` is the desired simultaneous coverage probability for the confidence region. Currently alpha must be set to 0.05, giving 95% simultaneous intervals. method : string The method used to produce the simultaneous confidence band. Only the Hall-Wellner (hw) method is currently implemented. transform : string The used to produce the interval (note that the returned interval is on the survival probability scale regardless of which transform is used). Only `log` and `arcsin` are implemented. Returns ------- lcb : array-like The lower confidence limits corresponding to the points in `surv_times`. ucb : array-like The upper confidence limits corresponding to the points in `surv_times`. """ method = method.lower() if method != "hw": msg = "only the Hall-Wellner (hw) method is implemented" raise ValueError(msg) if alpha != 0.05: raise ValueError("alpha must be set to 0.05") transform = transform.lower() s2 = self.surv_prob_se**2 / self.surv_prob**2 nn = self.n_risk if transform == "log": denom = np.sqrt(nn) * np.log(self.surv_prob) theta = 1.3581 * (1 + nn * s2) / denom theta = np.exp(theta) lcb = self.surv_prob**(1/theta) ucb = self.surv_prob**theta elif transform == "arcsin": k = 1.3581 k *= (1 + nn * s2) / (2 * np.sqrt(nn)) k *= np.sqrt(self.surv_prob / (1 - self.surv_prob)) f = np.arcsin(np.sqrt(self.surv_prob)) v = np.clip(f - k, 0, np.inf) lcb = np.sin(v)**2 v = np.clip(f + k, -np.inf, np.pi/2) ucb = np.sin(v)**2 else: raise ValueError("Unknown transform") return lcb, ucb
def survdiff(time, status, group, weight_type=None, strata=None, entry=None, **kwargs): """ Test for the equality of two survival distributions. Parameters: ----------- time : array-like The event or censoring times. status : array-like The censoring status variable, status=1 indicates that the event occured, status=0 indicates that the observation was censored. group : array-like Indicators of the two groups weight_type : string The following weight types are implemented: None (default) : logrank test fh : Fleming-Harrington, weights by S^(fh_p), requires exponent fh_p to be provided as keyword argument; the weights are derived from S defined at the previous event time, and the first weight is always 1. gb : Gehan-Breslow, weights by the number at risk tw : Tarone-Ware, weights by the square root of the number at risk strata : array-like Optional stratum indicators for a stratified test entry : array-like Entry times to handle left truncation. The subject is not in the risk set on or before the entry time. Returns -------- chisq : The chi-square (1 degree of freedom) distributed test statistic value pvalue : The p-value for the chi^2 test """ # TODO: extend to handle more than two groups time = np.asarray(time) status = np.asarray(status) group = np.asarray(group) gr = np.unique(group) if len(gr) != 2: raise ValueError("logrank only supports two groups") if strata is None: obs, var = _survdiff(time, status, group, weight_type, gr, entry, **kwargs) else: strata = np.asarray(strata) stu = np.unique(strata) obs, var = 0., 0. for st in stu: # could be more efficient? ii = (strata == st) obs1, var1 = _survdiff(time[ii], status[ii], group[ii], weight_type, gr, entry, **kwargs) obs += obs1 var += var1 zstat = obs / np.sqrt(var) # The chi^2 test statistic and p-value. chisq = zstat**2 pvalue = 1 - chi2.cdf(chisq, 1) return chisq, pvalue def _survdiff(time, status, group, weight_type, gr, entry=None, **kwargs): # logrank test for one stratum # Get the unique times. if entry is None: utimes, rtimes = np.unique(time, return_inverse=True) else: utimes, rtimes = np.unique(np.concatenate((time, entry)), return_inverse=True) rtimes = rtimes[0:len(time)] # Split entry times by group if present (should use pandas groupby) tse = [(gr[0], None), (gr[1], None)] if entry is not None: for k in 0, 1: ii = (group == gr[k]) entry1 = entry[ii] tse[k] = (gr[k], entry1) # Event count and risk set size at each time point, per group and overall. # TODO: should use Pandas groupby nrisk, obsv = [], [] ml = len(utimes) for g, entry0 in tse: mk = (group == g) n = np.bincount(rtimes, weights=mk, minlength=ml) ob = np.bincount(rtimes, weights=status*mk, minlength=ml) obsv.append(ob) if entry is not None: n = np.cumsum(n) - n rentry = np.searchsorted(utimes, entry0, side='left') n0 = np.bincount(rentry, minlength=ml) n0 = np.cumsum(n0) - n0 nr = n0 - n else: nr = np.cumsum(n[::-1])[::-1] nrisk.append(nr) obs = sum(obsv) nrisk_tot = sum(nrisk) # The variance of event counts in the first group. r = nrisk[0] / nrisk_tot var = obs * r * (1 - r) * (nrisk_tot - obs) / (nrisk_tot - 1) # The expected number of events in the first group. exp1 = obs * r weights = None if weight_type is not None: weight_type = weight_type.lower() if weight_type == "gb": weights = nrisk_tot elif weight_type == "tw": weights = np.sqrt(nrisk_tot) elif weight_type == "fh": if "fh_p" not in kwargs: msg = "weight_type type 'fh' requires specification of fh_p" raise ValueError(msg) fh_p = kwargs["fh_p"] # Calculate the survivor function directly to avoid the # overhead of creating a SurvfuncRight object sp = 1 - obs / nrisk_tot.astype(np.float64) sp = np.log(sp) sp = np.cumsum(sp) sp = np.exp(sp) weights = sp**fh_p weights = np.roll(weights, 1) weights[0] = 1 else: raise ValueError("weight_type not implemented") # The Z-scale test statistic (compare to normal reference # distribution). ix = np.flatnonzero(nrisk_tot > 1) if weights is None: obs = np.sum(obsv[0][ix] - exp1[ix]) var = np.sum(var[ix]) else: obs = np.dot(weights[ix], obsv[0][ix] - exp1[ix]) var = np.dot(weights[ix]**2, var[ix]) return obs, var def plot_survfunc(survfuncs, ax=None): """ Plot one or more survivor functions. Parameters ---------- survfuncs : object or array-like A single SurvfuncRight object, or a list or SurvfuncRight objects that are plotted together. Returns ------- A figure instance on which the plot was drawn. Examples -------- Add a legend: >>> import statsmodels.api as sm >>> from statsmodels.duration.survfunc import plot_survfunc >>> data = sm.datasets.get_rdataset("flchain", "survival").data >>> df = data.loc[data.sex == "F", :] >>> sf0 = sm.SurvfuncRight(df["futime"], df["death"]) >>> sf1 = sm.SurvfuncRight(3.0 * df["futime"], df["death"]) >>> fig = plot_survfunc([sf0, sf1]) >>> ax = fig.get_axes()[0] >>> ax.set_position([0.1, 0.1, 0.64, 0.8]) >>> ha, lb = ax.get_legend_handles_labels() >>> leg = fig.legend((ha[0], ha[1]), (lb[0], lb[1]), 'center right') Change the line colors: >>> fig = plot_survfunc([sf0, sf1]) >>> ax = fig.get_axes()[0] >>> ax.set_position([0.1, 0.1, 0.64, 0.8]) >>> ha, lb = ax.get_legend_handles_labels() >>> ha[0].set_color('purple') >>> ha[1].set_color('orange') """ fig, ax = utils.create_mpl_ax(ax) # If we have only a single survival function to plot, put it into # a list. try: assert(type(survfuncs[0]) is SurvfuncRight) except: survfuncs = [survfuncs] for gx, sf in enumerate(survfuncs): # The estimated survival function does not include a point at # time 0, include it here for plotting. surv_times = np.concatenate(([0], sf.surv_times)) surv_prob = np.concatenate(([1], sf.surv_prob)) # If the final times are censoring times they are not included # in the survival function so we add them here mxt = max(sf.time) if mxt > surv_times[-1]: surv_times = np.concatenate((surv_times, [mxt])) surv_prob = np.concatenate((surv_prob, [surv_prob[-1]])) label = getattr(sf, "title", "Group %d" % (gx + 1)) li, = ax.step(surv_times, surv_prob, '-', label=label, lw=2, where='post') # Plot the censored points. ii = np.flatnonzero(np.logical_not(sf.status)) ti = sf.time[ii] jj = np.searchsorted(surv_times, ti) - 1 sp = surv_prob[jj] ax.plot(ti, sp, '+', ms=12, color=li.get_color(), label=label + " points") ax.set_ylim(0, 1.01) return fig