Source code for statsmodels.regression.process_regression
"""
This module implements maximum likelihood-based estimation (MLE) of
Gaussian regression models for finite-dimensional observations made on
infinite-dimensional processes.
The ProcessMLE class supports regression analyses on grouped data,
where the observations within a group are dependent (they are made on
the same underlying process). One use-case is repeated measures
regression for temporal (longitudinal) data, in which the repeated
measures occur at arbitrary real-valued time points.
The mean structure is specified as a linear model. The covariance
parameters depend on covariates via a link function.
"""
import collections
import warnings
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import statsmodels.base.model as base
from statsmodels.formula._manager import FormulaManager
from statsmodels.iolib import summary2
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.numdiff import approx_fprime
class ProcessCovariance:
r"""
A covariance model for a process indexed by a real parameter.
An implementation of this class is based on a positive definite
correlation function h that maps real numbers to the interval [0,
1], such as the Gaussian (squared exponential) correlation
function :math:`\exp(-x^2)`. It also depends on a positive
scaling function `s` and a positive smoothness function `u`.
"""
def get_cov(self, time, sc, sm):
"""
Returns the covariance matrix for given time values.
Parameters
----------
time : array_like
The time points for the observations. If len(time) = p,
a pxp covariance matrix is returned.
sc : array_like
The scaling parameters for the observations.
sm : array_like
The smoothness parameters for the observation. See class
docstring for details.
"""
raise NotImplementedError
def jac(self, time, sc, sm):
"""
The Jacobian of the covariance with respect to the parameters.
See get_cov for parameters.
Returns
-------
jsc : list-like
jsc[i] is the derivative of the covariance matrix
with respect to the i^th scaling parameter.
jsm : list-like
jsm[i] is the derivative of the covariance matrix
with respect to the i^th smoothness parameter.
"""
raise NotImplementedError
[docs]
class GaussianCovariance(ProcessCovariance):
r"""
An implementation of ProcessCovariance using the Gaussian kernel.
This class represents a parametric covariance model for a Gaussian
process as described in the work of Paciorek et al. cited below.
Following Paciorek et al [1]_, the covariance between observations with
index `i` and `j` is given by:
.. math::
s[i] \cdot s[j] \cdot h(|time[i] - time[j]| / \sqrt{(u[i] + u[j]) /
2}) \cdot \frac{u[i]^{1/4}u[j]^{1/4}}{\sqrt{(u[i] + u[j])/2}}
The ProcessMLE class allows linear models with this covariance
structure to be fit using maximum likelihood (ML). The mean and
covariance parameters of the model are fit jointly.
The mean, scaling, and smoothing parameters can be linked to
covariates. The mean parameters are linked linearly, and the
scaling and smoothing parameters use an log link function to
preserve positivity.
The reference of Paciorek et al. below provides more details.
Note that here we only implement the 1-dimensional version of
their approach.
References
----------
.. [1] Paciorek, C. J. and Schervish, M. J. (2006). Spatial modeling using
a new class of nonstationary covariance functions. Environmetrics,
17:483–506.
https://papers.nips.cc/paper/2350-nonstationary-covariance-functions-for-gaussian-process-regression.pdf
"""
[docs]
def get_cov(self, time, sc, sm):
da = np.subtract.outer(time, time)
ds = np.add.outer(sm, sm) / 2
qmat = da * da / ds
cm = np.exp(-qmat / 2) / np.sqrt(ds)
cm *= np.outer(sm, sm)**0.25
cm *= np.outer(sc, sc)
return cm
[docs]
def jac(self, time, sc, sm):
da = np.subtract.outer(time, time)
ds = np.add.outer(sm, sm) / 2
sds = np.sqrt(ds)
daa = da * da
qmat = daa / ds
p = len(time)
eqm = np.exp(-qmat / 2)
sm4 = np.outer(sm, sm)**0.25
cmx = eqm * sm4 / sds
dq0 = -daa / ds**2
di = np.zeros((p, p))
fi = np.zeros((p, p))
scc = np.outer(sc, sc)
# Derivatives with respect to the smoothing parameters.
jsm = []
for i, _ in enumerate(sm):
di *= 0
di[i, :] += 0.5
di[:, i] += 0.5
dbottom = 0.5 * di / sds
dtop = -0.5 * eqm * dq0 * di
b = dtop / sds - eqm * dbottom / ds
c = eqm / sds
v = 0.25 * sm**0.25 / sm[i]**0.75
fi *= 0
fi[i, :] = v
fi[:, i] = v
fi[i, i] = 0.5 / sm[i]**0.5
b = c * fi + b * sm4
b *= scc
jsm.append(b)
# Derivatives with respect to the scaling parameters.
jsc = []
for i in range(0, len(sc)):
b = np.zeros((p, p))
b[i, :] = cmx[i, :] * sc
b[:, i] += cmx[:, i] * sc
jsc.append(b)
return jsc, jsm
def _check_args(endog, exog, exog_scale, exog_smooth, exog_noise, time,
groups):
v = [
len(endog),
exog.shape[0],
exog_scale.shape[0],
exog_smooth.shape[0],
len(time),
len(groups)
]
if exog_noise is not None:
v.append(exog_noise.shape[0])
if min(v) != max(v):
msg = ("The leading dimensions of all array arguments " +
"must be equal.")
raise ValueError(msg)
[docs]
class ProcessMLE(base.LikelihoodModel):
"""
Fit a Gaussian mean/variance regression model.
This class fits a one-dimensional Gaussian process model with
parametrized mean and covariance structures to grouped data. For
each group, there is an independent realization of a latent
Gaussian process indexed by an observed real-valued time
variable.. The data consist of the Gaussian process observed at a
finite number of `time` values.
The process mean and variance can be lined to covariates. The
mean structure is linear in the covariates. The covariance
structure is non-stationary, and is defined parametrically through
'scaling', and 'smoothing' parameters. The covariance of the
process between two observations in the same group is a function
of the distance between the time values of the two observations.
The scaling and smoothing parameters can be linked to covariates.
The observed data are modeled as the sum of the Gaussian process
realization and (optionally) independent white noise. The standard
deviation of the white noise can be linked to covariates.
The data should be provided in 'long form', with a group label to
indicate which observations belong to the same group.
Observations in different groups are always independent.
Parameters
----------
endog : array_like
The dependent variable.
exog : array_like
The design matrix for the mean structure
exog_scale : array_like
The design matrix for the scaling structure
exog_smooth : array_like
The design matrix for the smoothness structure
exog_noise : array_like
The design matrix for the additive white noise. The
linear predictor is the log of the white noise standard
deviation. If None, there is no additive noise (the
process is observed directly).
time : array_like (1-dimensional)
The univariate index values, used to calculate distances
between observations in the same group, which determines
their correlations.
groups : array_like (1-dimensional)
The group values.
cov : a ProcessCovariance instance
Defaults to GaussianCovariance.
"""
def __init__(self,
endog,
exog,
exog_scale,
exog_smooth,
exog_noise,
time,
groups,
cov=None,
**kwargs):
super().__init__(
endog,
exog,
exog_scale=exog_scale,
exog_smooth=exog_smooth,
exog_noise=exog_noise,
time=time,
groups=groups,
**kwargs)
self._has_noise = exog_noise is not None
# Create parameter names
xnames = []
if hasattr(exog, "columns"):
xnames = list(exog.columns)
else:
xnames = ["Mean%d" % j for j in range(exog.shape[1])]
if hasattr(exog_scale, "columns"):
xnames += list(exog_scale.columns)
else:
xnames += ["Scale%d" % j for j in range(exog_scale.shape[1])]
if hasattr(exog_smooth, "columns"):
xnames += list(exog_smooth.columns)
else:
xnames += ["Smooth%d" % j for j in range(exog_smooth.shape[1])]
if self._has_noise:
if hasattr(exog_noise, "columns"):
# If pandas-like, get the actual column names
xnames += list(exog_noise.columns)
else:
# If numpy-like, create default names
xnames += ["Noise%d" % j for j in range(exog_noise.shape[1])]
self.data.param_names = xnames
if cov is None:
cov = GaussianCovariance()
self.cov = cov
_check_args(endog, exog, exog_scale, exog_smooth, exog_noise,
time, groups)
groups_ix = collections.defaultdict(list)
for i, g in enumerate(groups):
groups_ix[g].append(i)
self._groups_ix = groups_ix
# Default, can be set in call to fit.
self.verbose = False
self.k_exog = self.exog.shape[1]
self.k_scale = self.exog_scale.shape[1]
self.k_smooth = self.exog_smooth.shape[1]
if self._has_noise:
self.k_noise = self.exog_noise.shape[1]
def _split_param_names(self):
xnames = self.data.param_names
q = 0
mean_names = xnames[q:q+self.k_exog]
q += self.k_exog
scale_names = xnames[q:q+self.k_scale]
q += self.k_scale
smooth_names = xnames[q:q+self.k_smooth]
if self._has_noise:
q += self.k_noise
noise_names = xnames[q:q+self.k_noise]
else:
noise_names = []
return mean_names, scale_names, smooth_names, noise_names
[docs]
@classmethod
def from_formula(cls,
formula,
data,
subset=None,
drop_cols=None,
*args,
**kwargs):
if "scale_formula" in kwargs:
scale_formula = kwargs["scale_formula"]
else:
raise ValueError("scale_formula is a required argument")
if "smooth_formula" in kwargs:
smooth_formula = kwargs["smooth_formula"]
else:
raise ValueError("smooth_formula is a required argument")
if "noise_formula" in kwargs:
noise_formula = kwargs["noise_formula"]
else:
noise_formula = None
if "time" in kwargs:
time = kwargs["time"]
else:
raise ValueError("time is a required argument")
if "groups" in kwargs:
groups = kwargs["groups"]
else:
raise ValueError("groups is a required argument")
if subset is not None:
warnings.warn("'subset' is ignored")
if drop_cols is not None:
warnings.warn("'drop_cols' is ignored")
if isinstance(time, str):
time = np.asarray(data[time])
if isinstance(groups, str):
groups = np.asarray(data[groups])
mgr = FormulaManager()
exog_scale = mgr.get_matrices(scale_formula, data)
scale_model_spec = mgr.spec
scale_names = list(scale_model_spec.column_names)
exog_scale = np.asarray(exog_scale)
exog_smooth = mgr.get_matrices(smooth_formula, data)
smooth_model_spec = mgr.spec
smooth_names = list(smooth_model_spec.column_names)
exog_smooth = np.asarray(exog_smooth)
if noise_formula is not None:
exog_noise = mgr.get_matrices(noise_formula, data)
noise_model_spec = mgr.spec
noise_names = list(noise_model_spec.column_names)
exog_noise = np.asarray(exog_noise)
else:
exog_noise, noise_model_spec, noise_names, exog_noise =\
None, None, [], None
mod = super().from_formula(
formula,
data=data,
subset=None,
exog_scale=exog_scale,
exog_smooth=exog_smooth,
exog_noise=exog_noise,
time=time,
groups=groups)
mod.data.scale_model_spec = scale_model_spec
mod.data.smooth_model_spec = smooth_model_spec
if mod._has_noise:
mod.data.noise_model_spec = noise_model_spec
mod.data.param_names = (mod.exog_names + scale_names +
smooth_names + noise_names)
return mod
[docs]
def unpack(self, z):
"""
Split the packed parameter vector into blocks.
"""
# Mean parameters
pm = self.exog.shape[1]
mnpar = z[0:pm]
# Standard deviation parameters
pv = self.exog_scale.shape[1]
scpar = z[pm:pm + pv]
# Smoothness parameters
ps = self.exog_smooth.shape[1]
smpar = z[pm + pv:pm + pv + ps]
# Observation white noise standard deviation.
# Empty if has_noise = False.
nopar = z[pm + pv + ps:]
return mnpar, scpar, smpar, nopar
def _get_start(self):
# Use OLS to get starting values for mean structure parameters
model = OLS(self.endog, self.exog)
result = model.fit()
m = self.exog_scale.shape[1] + self.exog_smooth.shape[1]
if self._has_noise:
m += self.exog_noise.shape[1]
return np.concatenate((result.params, np.zeros(m)))
[docs]
def loglike(self, params):
"""
Calculate the log-likelihood function for the model.
Parameters
----------
params : array_like
The packed parameters for the model.
Returns
-------
The log-likelihood value at the given parameter point.
Notes
-----
The mean, scaling, and smoothing parameters are packed into
a vector. Use `unpack` to access the component vectors.
"""
mnpar, scpar, smpar, nopar = self.unpack(params)
# Residuals
resid = self.endog - np.dot(self.exog, mnpar)
# Scaling parameters
sc = np.exp(np.dot(self.exog_scale, scpar))
# Smoothness parameters
sm = np.exp(np.dot(self.exog_smooth, smpar))
# White noise standard deviation
if self._has_noise:
no = np.exp(np.dot(self.exog_noise, nopar))
# Get the log-likelihood
ll = 0.
for _, ix in self._groups_ix.items():
# Get the covariance matrix for this person.
cm = self.cov.get_cov(self.time[ix], sc[ix], sm[ix])
# The variance of the additive noise, if present.
if self._has_noise:
cm.flat[::cm.shape[0] + 1] += no[ix]**2
re = resid[ix]
ll -= 0.5 * np.linalg.slogdet(cm)[1]
ll -= 0.5 * np.dot(re, np.linalg.solve(cm, re))
if self.verbose:
print("L=", ll)
return ll
[docs]
def score(self, params):
"""
Calculate the score function for the model.
Parameters
----------
params : array_like
The packed parameters for the model.
Returns
-------
The score vector at the given parameter point.
Notes
-----
The mean, scaling, and smoothing parameters are packed into
a vector. Use `unpack` to access the component vectors.
"""
mnpar, scpar, smpar, nopar = self.unpack(params)
pm, pv, ps = len(mnpar), len(scpar), len(smpar)
# Residuals
resid = self.endog - np.dot(self.exog, mnpar)
# Scaling
sc = np.exp(np.dot(self.exog_scale, scpar))
# Smoothness
sm = np.exp(np.dot(self.exog_smooth, smpar))
# White noise standard deviation
if self._has_noise:
no = np.exp(np.dot(self.exog_noise, nopar))
# Get the log-likelihood
score = np.zeros(len(mnpar) + len(scpar) + len(smpar) + len(nopar))
for _, ix in self._groups_ix.items():
sc_i = sc[ix]
sm_i = sm[ix]
resid_i = resid[ix]
time_i = self.time[ix]
exog_i = self.exog[ix, :]
exog_scale_i = self.exog_scale[ix, :]
exog_smooth_i = self.exog_smooth[ix, :]
# Get the covariance matrix for this person.
cm = self.cov.get_cov(time_i, sc_i, sm_i)
if self._has_noise:
no_i = no[ix]
exog_noise_i = self.exog_noise[ix, :]
cm.flat[::cm.shape[0] + 1] += no[ix]**2
cmi = np.linalg.inv(cm)
jacv, jacs = self.cov.jac(time_i, sc_i, sm_i)
# The derivatives for the mean parameters.
dcr = np.linalg.solve(cm, resid_i)
score[0:pm] += np.dot(exog_i.T, dcr)
# The derivatives for the scaling parameters.
rx = np.outer(resid_i, resid_i)
qm = np.linalg.solve(cm, rx)
qm = 0.5 * np.linalg.solve(cm, qm.T)
scx = sc_i[:, None] * exog_scale_i
for i, _ in enumerate(ix):
jq = np.sum(jacv[i] * qm)
score[pm:pm + pv] += jq * scx[i, :]
score[pm:pm + pv] -= 0.5 * np.sum(jacv[i] * cmi) * scx[i, :]
# The derivatives for the smoothness parameters.
smx = sm_i[:, None] * exog_smooth_i
for i, _ in enumerate(ix):
jq = np.sum(jacs[i] * qm)
score[pm + pv:pm + pv + ps] += jq * smx[i, :]
score[pm + pv:pm + pv + ps] -= (
0.5 * np.sum(jacs[i] * cmi) * smx[i, :])
# The derivatives with respect to the standard deviation parameters
if self._has_noise:
sno = no_i[:, None]**2 * exog_noise_i
score[pm + pv + ps:] -= np.dot(cmi.flat[::cm.shape[0] + 1],
sno)
bm = np.dot(cmi, np.dot(rx, cmi))
score[pm + pv + ps:] += np.dot(bm.flat[::bm.shape[0] + 1], sno)
if self.verbose:
print("|G|=", np.sqrt(np.sum(score * score)))
return score
[docs]
def fit(self, start_params=None, method=None, maxiter=None,
**kwargs):
"""
Fit a grouped Gaussian process regression using MLE.
Parameters
----------
start_params : array_like
Optional starting values.
method : str or array of str
Method or sequence of methods for scipy optimize.
maxiter : int
The maximum number of iterations in the optimization.
Returns
-------
An instance of ProcessMLEResults.
"""
if "verbose" in kwargs:
self.verbose = kwargs["verbose"]
minim_opts = {}
if "minim_opts" in kwargs:
minim_opts = kwargs["minim_opts"]
if start_params is None:
start_params = self._get_start()
if isinstance(method, str):
method = [method]
elif method is None:
method = ["powell", "bfgs"]
for j, meth in enumerate(method):
if meth not in ("powell",):
def jac(x):
return -self.score(x)
else:
jac = None
if maxiter is not None:
if np.isscalar(maxiter):
minim_opts["maxiter"] = maxiter
else:
minim_opts["maxiter"] = maxiter[j % len(maxiter)]
f = minimize(
lambda x: -self.loglike(x),
method=meth,
x0=start_params,
jac=jac,
options=minim_opts)
if not f.success:
msg = "Fitting did not converge"
if jac is not None:
msg += ", |gradient|=%.6f" % np.sqrt(np.sum(f.jac**2))
if j < len(method) - 1:
msg += ", trying %s next..." % method[j+1]
warnings.warn(msg)
if np.isfinite(f.x).all():
start_params = f.x
hess = self.hessian(f.x)
try:
cov_params = -np.linalg.inv(hess)
except Exception:
cov_params = None
class rslt:
pass
r = rslt()
r.params = f.x
r.normalized_cov_params = cov_params
r.optim_retvals = f
r.scale = 1
rslt = ProcessMLEResults(self, r)
return rslt
[docs]
def covariance(self, time, scale_params, smooth_params, scale_data,
smooth_data):
"""
Returns a Gaussian process covariance matrix.
Parameters
----------
time : array_like
The time points at which the fitted covariance matrix is
calculated.
scale_params : array_like
The regression parameters for the scaling part
of the covariance structure.
smooth_params : array_like
The regression parameters for the smoothing part
of the covariance structure.
scale_data : DataFrame
The data used to determine the scale parameter,
must have len(time) rows.
smooth_data : DataFrame
The data used to determine the smoothness parameter,
must have len(time) rows.
Returns
-------
A covariance matrix.
Notes
-----
If the model was fit using formulas, `scale` and `smooth` should
be Dataframes, containing all variables that were present in the
respective scaling and smoothing formulas used to fit the model.
Otherwise, `scale` and `smooth` should contain data arrays whose
columns align with the fitted scaling and smoothing parameters.
The covariance is only for the Gaussian process and does not include
the white noise variance.
"""
if not hasattr(self.data, "scale_model_spec"):
sca = np.dot(scale_data, scale_params)
smo = np.dot(smooth_data, smooth_params)
else:
mgr = FormulaManager()
sc = mgr.get_matrices(self.data.scale_model_spec, scale_data, pandas=False)
sm = mgr.get_matrices(
self.data.smooth_model_spec, smooth_data, pandas=False
)
sca = np.exp(np.dot(sc, scale_params))
smo = np.exp(np.dot(sm, smooth_params))
return self.cov.get_cov(time, sca, smo)
[docs]
def predict(self, params, exog=None, *args, **kwargs):
"""
Obtain predictions of the mean structure.
Parameters
----------
params : array_like
The model parameters, may be truncated to include only mean
parameters.
exog : array_like
The design matrix for the mean structure. If not provided,
the model's design matrix is used.
"""
if exog is None:
exog = self.exog
elif hasattr(self.data, "model_spec"):
# Run the provided data through the formula if present
mgr = FormulaManager()
exog = mgr.get_matrices(self.data.model_spec, exog)
if len(params) > exog.shape[1]:
params = params[0:exog.shape[1]]
return np.dot(exog, params)
[docs]
class ProcessMLEResults(base.GenericLikelihoodModelResults):
"""
Results class for Gaussian process regression models.
"""
def __init__(self, model, mlefit):
super().__init__(
model, mlefit)
pa = model.unpack(mlefit.params)
self.mean_params = pa[0]
self.scale_params = pa[1]
self.smooth_params = pa[2]
self.no_params = pa[3]
self.df_resid = model.endog.shape[0] - len(mlefit.params)
self.k_exog = self.model.exog.shape[1]
self.k_scale = self.model.exog_scale.shape[1]
self.k_smooth = self.model.exog_smooth.shape[1]
self._has_noise = model._has_noise
if model._has_noise:
self.k_noise = self.model.exog_noise.shape[1]
[docs]
def predict(self, exog=None, transform=True, *args, **kwargs):
if not transform:
warnings.warn("'transform=False' is ignored in predict")
if len(args) > 0 or len(kwargs) > 0:
warnings.warn("extra arguments ignored in 'predict'")
return self.model.predict(self.params, exog)
[docs]
def covariance(self, time, scale, smooth):
"""
Returns a fitted covariance matrix.
Parameters
----------
time : array_like
The time points at which the fitted covariance
matrix is calculated.
scale : array_like
The data used to determine the scale parameter,
must have len(time) rows.
smooth : array_like
The data used to determine the smoothness parameter,
must have len(time) rows.
Returns
-------
A covariance matrix.
Notes
-----
If the model was fit using formulas, `scale` and `smooth` should
be Dataframes, containing all variables that were present in the
respective scaling and smoothing formulas used to fit the model.
Otherwise, `scale` and `smooth` should be data arrays whose
columns align with the fitted scaling and smoothing parameters.
"""
return self.model.covariance(time, self.scale_params,
self.smooth_params, scale, smooth)
[docs]
def covariance_group(self, group):
# Check if the group exists, since _groups_ix is a
# DefaultDict use len instead of catching a KeyError.
ix = self.model._groups_ix[group]
if len(ix) == 0:
msg = "Group '%s' does not exist" % str(group)
raise ValueError(msg)
scale_data = self.model.exog_scale[ix, :]
smooth_data = self.model.exog_smooth[ix, :]
_, scale_names, smooth_names, _ = self.model._split_param_names()
scale_data = pd.DataFrame(scale_data, columns=scale_names)
smooth_data = pd.DataFrame(smooth_data, columns=smooth_names)
time = self.model.time[ix]
return self.model.covariance(time,
self.scale_params,
self.smooth_params,
scale_data,
smooth_data)
[docs]
def summary(self, yname=None, xname=None, title=None, alpha=0.05):
df = pd.DataFrame()
typ = (["Mean"] * self.k_exog + ["Scale"] * self.k_scale +
["Smooth"] * self.k_smooth)
if self._has_noise:
typ += ["SD"] * self.k_noise
df["Type"] = typ
df["coef"] = self.params
try:
df["std err"] = np.sqrt(np.diag(self.cov_params()))
except Exception:
df["std err"] = np.nan
from scipy.stats.distributions import norm
df["tvalues"] = df.coef / df["std err"]
df["P>|t|"] = 2 * norm.sf(np.abs(df.tvalues))
f = norm.ppf(1 - alpha / 2)
df["[%.3f" % (alpha / 2)] = df.coef - f * df["std err"]
df["%.3f]" % (1 - alpha / 2)] = df.coef + f * df["std err"]
df.index = self.model.data.param_names
summ = summary2.Summary()
if title is None:
title = "Gaussian process regression results"
summ.add_title(title)
summ.add_df(df)
return summ
Last update:
Jan 20, 2025