Source code for statsmodels.robust.scale
"""
Support and standalone functions for Robust Linear Models
References
----------
PJ Huber. 'Robust Statistics' John Wiley and Sons, Inc., New York, 1981.
R Venables, B Ripley. 'Modern Applied Statistics in S'
Springer, New York, 2002.
C Croux, PJ Rousseeuw, 'Time-efficient algorithms for two highly robust
estimators of scale' Computational statistics. Physica, Heidelberg, 1992.
"""
import numpy as np
from scipy import stats
from scipy.stats import norm as Gaussian
from statsmodels.tools import tools
from statsmodels.tools.validation import array_like, float_like
from . import norms
from ._qn import _qn
class Holder():
def __init__(self, **kwds):
self.__dict__.update(kwds)
[docs]
def mad(a, c=Gaussian.ppf(3 / 4.0), axis=0, center=np.median):
# c \approx .6745
"""
The Median Absolute Deviation along given axis of an array
Parameters
----------
a : array_like
Input array.
c : float, optional
The normalization constant. Defined as scipy.stats.norm.ppf(3/4.),
which is approximately 0.6745.
axis : int, optional
The default is 0. Can also be None.
center : callable or float
If a callable is provided, such as the default `np.median` then it
is expected to be called center(a). The axis argument will be applied
via np.apply_over_axes. Otherwise, provide a float.
Returns
-------
mad : float
`mad` = median(abs(`a` - center))/`c`
"""
a = array_like(a, "a", ndim=None)
c = float_like(c, "c")
if not a.size:
center_val = 0.0
elif callable(center):
if axis is not None:
center_val = np.apply_over_axes(center, a, axis)
else:
center_val = center(a.ravel())
else:
center_val = float_like(center, "center")
err = (np.abs(a - center_val)) / c
if not err.size:
if axis is None or err.ndim == 1:
return np.nan
else:
shape = list(err.shape)
shape.pop(axis)
return np.empty(shape)
return np.median(err, axis=axis)
[docs]
def iqr(a, c=Gaussian.ppf(3 / 4) - Gaussian.ppf(1 / 4), axis=0):
"""
The normalized interquartile range along given axis of an array
Parameters
----------
a : array_like
Input array.
c : float, optional
The normalization constant, used to get consistent estimates of the
standard deviation at the normal distribution. Defined as
scipy.stats.norm.ppf(3/4.) - scipy.stats.norm.ppf(1/4.), which is
approximately 1.349.
axis : int, optional
The default is 0. Can also be None.
Returns
-------
The normalized interquartile range
"""
a = array_like(a, "a", ndim=None)
c = float_like(c, "c")
if a.ndim == 0:
raise ValueError("a should have at least one dimension")
elif a.size == 0:
return np.nan
else:
quantiles = np.quantile(a, [0.25, 0.75], axis=axis)
return np.squeeze(np.diff(quantiles, axis=0) / c)
[docs]
def qn_scale(a, c=1 / (np.sqrt(2) * Gaussian.ppf(5 / 8)), axis=0):
"""
Computes the Qn robust estimator of scale
The Qn scale estimator is a more efficient alternative to the MAD.
The Qn scale estimator of an array a of length n is defined as
c * {abs(a[i] - a[j]): i<j}_(k), for k equal to [n/2] + 1 choose 2. Thus,
the Qn estimator is the k-th order statistic of the absolute differences
of the array. The optional constant is used to normalize the estimate
as explained below. The implementation follows the algorithm described
in Croux and Rousseeuw (1992).
Parameters
----------
a : array_like
Input array.
c : float, optional
The normalization constant. The default value is used to get consistent
estimates of the standard deviation at the normal distribution.
axis : int, optional
The default is 0.
Returns
-------
{float, ndarray}
The Qn robust estimator of scale
"""
a = array_like(
a, "a", ndim=None, dtype=np.float64, contiguous=True, order="C"
)
c = float_like(c, "c")
if a.ndim == 0:
raise ValueError("a should have at least one dimension")
elif a.size == 0:
return np.nan
else:
out = np.apply_along_axis(_qn, axis=axis, arr=a, c=c)
if out.ndim == 0:
return float(out)
return out
def _qn_naive(a, c=1 / (np.sqrt(2) * Gaussian.ppf(5 / 8))):
"""
A naive implementation of the Qn robust estimator of scale, used solely
to test the faster, more involved one
Parameters
----------
a : array_like
Input array.
c : float, optional
The normalization constant, used to get consistent estimates of the
standard deviation at the normal distribution. Defined as
1/(np.sqrt(2) * scipy.stats.norm.ppf(5/8)), which is 2.219144.
Returns
-------
The Qn robust estimator of scale
"""
a = np.squeeze(a)
n = a.shape[0]
if a.size == 0:
return np.nan
else:
h = int(n // 2 + 1)
k = int(h * (h - 1) / 2)
idx = np.triu_indices(n, k=1)
diffs = np.abs(a[idx[0]] - a[idx[1]])
output = np.partition(diffs, kth=k - 1)[k - 1]
output = c * output
return output
[docs]
class Huber:
"""
Huber's proposal 2 for estimating location and scale jointly.
Parameters
----------
c : float, optional
Threshold used in threshold for chi=psi**2. Default value is 1.5.
tol : float, optional
Tolerance for convergence. Default value is 1e-08.
maxiter : int, optional0
Maximum number of iterations. Default value is 30.
norm : statsmodels.robust.norms.RobustNorm, optional
A robust norm used in M estimator of location. If None,
the location estimator defaults to a one-step
fixed point version of the M-estimator using Huber's T.
call
Return joint estimates of Huber's scale and location.
Examples
--------
>>> import numpy as np
>>> import statsmodels.api as sm
>>> chem_data = np.array([2.20, 2.20, 2.4, 2.4, 2.5, 2.7, 2.8, 2.9, 3.03,
... 3.03, 3.10, 3.37, 3.4, 3.4, 3.4, 3.5, 3.6, 3.7, 3.7, 3.7, 3.7,
... 3.77, 5.28, 28.95])
>>> sm.robust.scale.huber(chem_data)
(array(3.2054980819923693), array(0.67365260010478967))
"""
def __init__(self, c=1.5, tol=1.0e-08, maxiter=30, norm=None):
self.c = c
self.maxiter = maxiter
self.tol = tol
self.norm = norm
tmp = 2 * Gaussian.cdf(c) - 1
self.gamma = tmp + c ** 2 * (1 - tmp) - 2 * c * Gaussian.pdf(c)
def __call__(self, a, mu=None, initscale=None, axis=0):
"""
Compute Huber's proposal 2 estimate of scale, using an optional
initial value of scale and an optional estimate of mu. If mu
is supplied, it is not reestimated.
Parameters
----------
a : ndarray
1d array
mu : float or None, optional
If the location mu is supplied then it is not reestimated.
Default is None, which means that it is estimated.
initscale : float or None, optional
A first guess on scale. If initscale is None then the standardized
median absolute deviation of a is used.
Notes
-----
`Huber` minimizes the function
sum(psi((a[i]-mu)/scale)**2)
as a function of (mu, scale), where
psi(x) = np.clip(x, -self.c, self.c)
"""
a = np.asarray(a)
if mu is None:
n = a.shape[axis] - 1
mu = np.median(a, axis=axis)
est_mu = True
else:
n = a.shape[axis]
mu = mu
est_mu = False
if initscale is None:
scale = mad(a, axis=axis)
else:
scale = initscale
scale = tools.unsqueeze(scale, axis, a.shape)
mu = tools.unsqueeze(mu, axis, a.shape)
return self._estimate_both(a, scale, mu, axis, est_mu, n)
def _estimate_both(self, a, scale, mu, axis, est_mu, n):
"""
Estimate scale and location simultaneously with the following
pseudo_loop:
while not_converged:
mu, scale = estimate_location(a, scale, mu), estimate_scale(a, scale, mu)
where estimate_location is an M-estimator and estimate_scale implements
the check used in Section 5.5 of Venables & Ripley
""" # noqa:E501
for _ in range(self.maxiter):
# Estimate the mean along a given axis
if est_mu:
if self.norm is None:
# This is a one-step fixed-point estimator
# if self.norm == norms.HuberT
# It should be faster than using norms.HuberT
nmu = (
np.clip(
a, mu - self.c * scale, mu + self.c * scale
).sum(axis)
/ a.shape[axis]
)
else:
nmu = norms.estimate_location(
a, scale, self.norm, axis, mu, self.maxiter, self.tol
)
else:
# Effectively, do nothing
nmu = mu.squeeze()
nmu = tools.unsqueeze(nmu, axis, a.shape)
subset = np.less_equal(np.abs((a - mu) / scale), self.c)
scale_num = np.sum(subset * (a - nmu) ** 2 +
(1 - subset) * (scale * self.c)**2, axis)
scale_denom = n * self.gamma
nscale = np.sqrt(scale_num / scale_denom)
nscale = tools.unsqueeze(nscale, axis, a.shape)
test1 = np.all(
np.less_equal(np.abs(scale - nscale), nscale * self.tol)
)
test2 = np.all(
np.less_equal(np.abs(mu - nmu), nscale * self.tol)
)
if not (test1 and test2):
mu = nmu
scale = nscale
else:
return nmu.squeeze(), nscale.squeeze()
raise ValueError(
"joint estimation of location and scale failed "
"to converge in %d iterations" % self.maxiter
)
huber = Huber()
[docs]
class HuberScale:
r"""
Huber's scaling for fitting robust linear models.
Huber's scale is intended to be used as the scale estimate in the
IRLS algorithm and is slightly different than the `Huber` class.
Parameters
----------
d : float, optional
d is the tuning constant for Huber's scale. Default is 2.5
tol : float, optional
The convergence tolerance
maxiter : int, optiona
The maximum number of iterations. The default is 30.
Methods
-------
call
Return's Huber's scale computed as below
Notes
-----
Huber's scale is the iterative solution to
scale_(i+1)**2 = 1/(n*h)*sum(chi(r/sigma_i)*sigma_i**2
where the Huber function is
chi(x) = (x**2)/2 for \|x\| < d
chi(x) = (d**2)/2 for \|x\| >= d
and the Huber constant h = (n-p)/n*(d**2 + (1-d**2)*
scipy.stats.norm.cdf(d) - .5 - d*sqrt(2*pi)*exp(-0.5*d**2)
"""
def __init__(self, d=2.5, tol=1e-08, maxiter=30):
self.d = d
self.tol = tol
self.maxiter = maxiter
def __call__(self, df_resid, nobs, resid):
h = (
df_resid
/ nobs
* (
self.d ** 2
+ (1 - self.d ** 2) * Gaussian.cdf(self.d)
- 0.5
- self.d / (np.sqrt(2 * np.pi)) * np.exp(-0.5 * self.d ** 2)
)
)
s = mad(resid)
def subset(x):
return np.less(np.abs(resid / x), self.d)
def chi(s):
return subset(s) * (resid / s) ** 2 / 2 + (1 - subset(s)) * (
self.d ** 2 / 2
)
scalehist = [np.inf, s]
niter = 1
while (
np.abs(scalehist[niter - 1] - scalehist[niter]) > self.tol
and niter < self.maxiter
):
nscale = np.sqrt(
1
/ (nobs * h)
* np.sum(chi(scalehist[-1]))
* scalehist[-1] ** 2
)
scalehist.append(nscale)
niter += 1
# TODO: raise on convergence failure?
return scalehist[-1]
hubers_scale = HuberScale()
class MScale:
"""M-scale estimation.
experimental interface, arguments and options will still change.
Parameters
----------
chi_func : callable
The rho or chi function for the moment condition for estimating scale.
scale_bias : float
Factor in moment condition to obtain fisher consistency of the scale
estimate at the normal distribution.
"""
def __init__(self, chi_func, scale_bias):
self.chi_func = chi_func
self.scale_bias = scale_bias
def __repr__(self):
return repr(self.chi_func)
def __call__(self, data, **kwds):
return self.fit(data, **kwds)
def fit(self, data, start_scale='mad', maxiter=100, rtol=1e-6, atol=1e-8):
"""
Estimate M-scale using iteration.
Parameters
----------
data : array-like
Data, currently assumed to be 1-dimensional.
start_scale : string or float.
Starting value of scale or method to compute the starting value.
Default is using 'mad', no other string options are available.
maxiter : int
Maximum number of iterations.
rtol : float
Relative convergence tolerance.
atol : float
Absolute onvergence tolerance.
Returns
-------
float : Scale estimate. The estimated variance is scale squared.
Todo: switch to Holder instance with more information.
"""
scale = _scale_iter(
data,
scale0=start_scale,
maxiter=maxiter, rtol=rtol, atol=atol,
meef_scale=self.chi_func,
scale_bias=self.scale_bias,
)
return scale
def scale_trimmed(data, alpha, center='median', axis=0, distr=None,
distargs=None):
"""scale estimate based on symmetrically trimmed sample
The scale estimate is robust to a fraction alpha of outliers on each
tail.
The scale is normalized to correspond to a reference distribution, which
is the normal distribution by default.
Parameters
----------
data : array_like
dataset, by default (axis=0) observations are assumed to be in rows
and variables in columns.
alpha : float in interval (0, 1)
Trimming fraction in each tail. The floor(nobs * alpha) smallest
observations are trimmed, and the same number of the largest
observations are trimmed. scale estimate is base on a fraction
(1 - 2 * alpha) of observations.
center : 'median', 'mean', 'tmean' or number
`center` defines how the trimmed sample is centered. 'median' and
'mean' are calculated on the full sample. `tmean` is the trimmed
mean, calculated with the trimmed sample. If `center` is array_like
then it needs to be scalar or correspond to the shape of the data
reduced by axis.
axis : int, default is 0
axis along which scale is estimated.
distr : None, 'raw' or a distribution instance
Default if distr is None is the normal distribution `scipy.stats.norm`.
This is the reference distribution to normalize the scale.
Note: This cannot be a frozen instance, since it does not have an
`expect` method.
If distr is 'raw', then the scale is not normalized.
distargs :
Arguments for the distribution.
Returns
-------
scale : float or array
the estimated scale normalized for the reference distribution.
Examples
--------
for normal distribution
>>> np.random.seed(1)
>>> x = 2 * np.random.randn(100)
>>> scale_trimmed(x, 0.1)
1.7479516739879672
for t distribution
>>> xt = stats.t.rvs(3, size=1000, scale=2)
>>> print scale_trimmed(xt, alpha, distr=stats.t, distargs=(3,))
2.06574778599
compare to standard deviation of sample
>>> xt.std()
3.1457788359130481
"""
if distr is None:
distr = stats.norm
if distargs is None:
distargs = ()
x = np.array(data) # make copy for inplace sort
if axis is None:
x = x.ravel()
axis = 0
# TODO: latest numpy has partial sort
x.sort(axis)
nobs = x.shape[axis]
if distr == 'raw':
c_inv = 1
else:
bound = distr.ppf(1 - alpha, *distargs)
c_inv = distr.expect(lambda x: x*x, lb=-bound, ub=bound, args=distargs)
cut_idx = np.floor(nobs * alpha).astype(int)
sl = [slice(None, None, None)] * x.ndim
sl[axis] = slice(cut_idx, -cut_idx)
# x_trimmed = x[cut_idx:-cut_idx]
# cut in axis
x_trimmed = x[tuple(sl)]
center_type = center
if center in ['med', 'median']:
center = np.median(x, axis=axis)
elif center == 'mean':
center = np.mean(x, axis=axis)
elif center == 'tmean':
center = np.mean(x_trimmed, axis=axis)
else:
# assume number
center_type = 'user'
center_ndim = np.ndim(center)
if (center_ndim > 0) and (center_ndim < x.ndim):
center = np.expand_dims(center, axis)
s_raw = ((x_trimmed - center)**2).sum(axis)
scale = np.sqrt(s_raw / nobs / c_inv)
res = Holder(scale=scale,
center=center,
center_type=center_type,
trim_idx=cut_idx,
nobs=nobs,
distr=distr,
scale_correction=1. / c_inv)
return res
def _weight_mean(x, c):
"""Tukey-biweight, bisquare weights used in tau scale.
Parameters
----------
x : ndarray
Data
c : float
Parameter for bisquare weights
Returns
-------
ndarray : weights
"""
x = np.asarray(x)
w = (1 - (x / c)**2)**2 * (np.abs(x) <= c)
return w
def _winsor(x, c):
"""Winsorized squared data used in tau scale.
Parameters
----------
x : ndarray
Data
c : float
threshold
Returns
-------
winsorized squared data, ``np.minimum(x**2, c**2)``
"""
return np.minimum(x**2, c**2)
def scale_tau(data, cm=4.5, cs=3, weight_mean=_weight_mean,
weight_scale=_winsor, normalize=True, ddof=0):
"""Tau estimator of univariate scale.
Experimental, API will change
Parameters
----------
data : array_like, 1-D or 2-D
If data is 2d, then the location and scale estimates
are calculated for each column
cm : float
constant used in call to weight_mean
cs : float
constant used in call to weight_scale
weight_mean : callable
function to calculate weights for weighted mean
weight_scale : callable
function to calculate scale, "rho" function
normalize : bool
rescale the scale estimate so it is consistent when the data is
normally distributed. The computation assumes winsorized (truncated)
variance.
Returns
-------
mean : nd_array
robust mean
std : nd_array
robust estimate of scale (standard deviation)
Notes
-----
Uses definition of Maronna and Zamar 2002, with weighted mean and
trimmed variance.
The normalization has been added to match R robustbase.
R robustbase uses by default ddof=0, with option to set it to 2.
References
----------
.. [1] Maronna, Ricardo A, and Ruben H Zamar. “Robust Estimates of Location
and Dispersion for High-Dimensional Datasets.” Technometrics 44, no. 4
(November 1, 2002): 307–17. https://doi.org/10.1198/004017002188618509.
"""
x = np.asarray(data)
nobs = x.shape[0]
med_x = np.median(x, 0)
xdm = x - med_x
mad_x = np.median(np.abs(xdm), 0)
wm = weight_mean(xdm / mad_x, cm)
mean = (wm * x).sum(0) / wm.sum(0)
var = (mad_x**2 * weight_scale((x - mean) / mad_x, cs).sum(0) /
(nobs - ddof))
cf = 1
if normalize:
c = cs * stats.norm.ppf(0.75)
cf = 2 * ((1 - c**2) * stats.norm.cdf(c) - c * stats.norm.pdf(c)
+ c**2) - 1
# return Holder(loc=mean, scale=np.sqrt(var / cf))
return mean, np.sqrt(var / cf)
debug = 0
def _scale_iter(data, scale0='mad', maxiter=100, rtol=1e-6, atol=1e-8,
meef_scale=None, scale_bias=None, iter_method="rho", ddof=0):
"""iterative scale estimate base on "rho" function
"""
x = np.asarray(data)
nobs = x.shape[0]
if scale0 == 'mad':
scale0 = mad(x, center=0)
for i in range(maxiter):
x_scaled = x / scale0
if iter_method == "rho":
scale = scale0 * np.sqrt(
np.sum(meef_scale(x / scale0)) / scale_bias / (nobs - ddof))
else:
weights_scale = meef_scale(x_scaled) / (1e-50 + x_scaled**2)
scale2 = (weights_scale * x**2).sum() / (nobs - ddof)
scale2 /= scale_bias
scale = np.sqrt(scale2)
if debug:
print(scale)
if np.allclose(scale, scale0, atol=atol, rtol=rtol):
break
scale0 = scale
return scale
Last update:
Dec 23, 2024