Source code for statsmodels.base.distributed_estimation

from statsmodels.base.elastic_net import RegularizedResults
from statsmodels.stats.regularized_covariance import _calc_nodewise_row, \
    _calc_nodewise_weight, _calc_approx_inv_cov
from statsmodels.base.model import LikelihoodModelResults
from statsmodels.regression.linear_model import OLS
import numpy as np

Distributed estimation routines. Currently, we support several
methods of distribution

- sequential, has no extra dependencies
- parallel
    - with joblib
        A variety of backends are supported through joblib
        This allows for different types of clusters besides
        standard local clusters.  Some examples of
        backends supported by joblib are
          - dask.distributed
          - yarn
          - ipyparallel

The framework is very general and allows for a variety of
estimation methods.  Currently, these include

- debiased regularized estimation
- simple coefficient averaging (naive)
    - regularized
    - unregularized

Currently, the default is regularized estimation with debiasing
which follows the methods outlined in

Jason D. Lee, Qiang Liu, Yuekai Sun and Jonathan E. Taylor.
"Communication-Efficient Sparse Regression: A One-Shot Approach."
arXiv:1503.04337. 2015.

There are several variables that are taken from the source paper
for which the interpretation may not be directly clear from the
code, these are mostly used to help form the estimate of the
approximate inverse covariance matrix as part of the
debiasing procedure.


    A weighted design matrix used to perform the node-wise
    regression procedure.


    nodewise_row is produced as part of the node-wise regression
    procedure used to produce the approximate inverse covariance
    matrix.  One is produced for each variable using the


    nodewise_weight is produced using the gamma_hat values for
    each p to produce weights to reweight the gamma_hat values which
    are ultimately used to form approx_inv_cov.


    This is the estimate of the approximate inverse covariance
    matrix.  This is used to debiase the coefficient average
    along with the average gradient.  For the OLS case,
    approx_inv_cov is an approximation for

        n * (X^T X)^{-1}

    formed by node-wise regression.

def _est_regularized_naive(mod, pnum, partitions, fit_kwds=None):
    """estimates the regularized fitted parameters.

    mod : statsmodels model class instance
        The model for the current partition.
    pnum : scalar
        Index of current partition
    partitions : scalar
        Total number of partitions
    fit_kwds : dict-like or None
        Keyword arguments to be given to fit_regularized

    An array of the parameters for the regularized fit

    if fit_kwds is None:
        raise ValueError("_est_regularized_naive currently " +
                         "requires that fit_kwds not be None.")

    return mod.fit_regularized(**fit_kwds).params

def _est_unregularized_naive(mod, pnum, partitions, fit_kwds=None):
    """estimates the unregularized fitted parameters.

    mod : statsmodels model class instance
        The model for the current partition.
    pnum : scalar
        Index of current partition
    partitions : scalar
        Total number of partitions
    fit_kwds : dict-like or None
        Keyword arguments to be given to fit

    An array of the parameters for the fit

    if fit_kwds is None:
        raise ValueError("_est_unregularized_naive currently " +
                         "requires that fit_kwds not be None.")


def _join_naive(params_l, threshold=0):
    """joins the results from each run of _est_<type>_naive
    and returns the mean estimate of the coefficients

    params_l : list
        A list of arrays of coefficients.
    threshold : scalar
        The threshold at which the coefficients will be cut.

    p = len(params_l[0])
    partitions = len(params_l)

    params_mn = np.zeros(p)
    for params in params_l:
        params_mn += params
    params_mn /= partitions

    params_mn[np.abs(params_mn) < threshold] = 0

    return params_mn

def _calc_grad(mod, params, alpha, L1_wt, score_kwds):
    """calculates the log-likelihood gradient for the debiasing

    mod : statsmodels model class instance
        The model for the current partition.
    params : array_like
        The estimated coefficients for the current partition.
    alpha : scalar or array_like
        The penalty weight.  If a scalar, the same penalty weight
        applies to all variables in the model.  If a vector, it
        must have the same length as `params`, and contains a
        penalty weight for each coefficient.
    L1_wt : scalar
        The fraction of the penalty given to the L1 penalty term.
        Must be between 0 and 1 (inclusive).  If 0, the fit is
        a ridge fit, if 1 it is a lasso fit.
    score_kwds : dict-like or None
        Keyword arguments for the score function.

    An array-like object of the same dimension as params

    In general:

    gradient l_k(params)

    where k corresponds to the index of the partition

    For OLS:

    X^T(y - X^T params)

    grad = -mod.score(np.asarray(params), **score_kwds)
    grad += alpha * (1 - L1_wt)
    return grad

def _calc_wdesign_mat(mod, params, hess_kwds):
    """calculates the weighted design matrix necessary to generate
    the approximate inverse covariance matrix

    mod : statsmodels model class instance
        The model for the current partition.
    params : array_like
        The estimated coefficients for the current partition.
    hess_kwds : dict-like or None
        Keyword arguments for the hessian function.

    An array-like object, updated design matrix, same dimension
    as mod.exog

    rhess = np.sqrt(mod.hessian_factor(np.asarray(params), **hess_kwds))
    return rhess[:, None] * mod.exog

def _est_regularized_debiased(mod, mnum, partitions, fit_kwds=None,
                              score_kwds=None, hess_kwds=None):
    """estimates the regularized fitted parameters, is the default
    estimation_method for class DistributedModel.

    mod : statsmodels model class instance
        The model for the current partition.
    mnum : scalar
        Index of current partition.
    partitions : scalar
        Total number of partitions.
    fit_kwds : dict-like or None
        Keyword arguments to be given to fit_regularized
    score_kwds : dict-like or None
        Keyword arguments for the score function.
    hess_kwds : dict-like or None
        Keyword arguments for the Hessian function.

    A tuple of parameters for regularized fit
        An array-like object of the fitted parameters, params
        An array-like object for the gradient
        A list of array like objects for nodewise_row
        A list of array like objects for nodewise_weight

    score_kwds = {} if score_kwds is None else score_kwds
    hess_kwds = {} if hess_kwds is None else hess_kwds

    if fit_kwds is None:
        raise ValueError("_est_regularized_debiased currently " +
                         "requires that fit_kwds not be None.")
        alpha = fit_kwds["alpha"]

    if "L1_wt" in fit_kwds:
        L1_wt = fit_kwds["L1_wt"]
        L1_wt = 1

    nobs, p = mod.exog.shape
    p_part = int(np.ceil((1. * p) / partitions))

    params = mod.fit_regularized(**fit_kwds).params
    grad = _calc_grad(mod, params, alpha, L1_wt, score_kwds) / nobs

    wexog = _calc_wdesign_mat(mod, params, hess_kwds)

    nodewise_row_l = []
    nodewise_weight_l = []
    for idx in range(mnum * p_part, min((mnum + 1) * p_part, p)):

        nodewise_row = _calc_nodewise_row(wexog, idx, alpha)

        nodewise_weight = _calc_nodewise_weight(wexog, nodewise_row, idx,

    return params, grad, nodewise_row_l, nodewise_weight_l

def _join_debiased(results_l, threshold=0):
    """joins the results from each run of _est_regularized_debiased
    and returns the debiased estimate of the coefficients

    results_l : list
        A list of tuples each one containing the params, grad,
        nodewise_row and nodewise_weight values for each partition.
    threshold : scalar
        The threshold at which the coefficients will be cut.

    p = len(results_l[0][0])
    partitions = len(results_l)

    params_mn = np.zeros(p)
    grad_mn = np.zeros(p)

    nodewise_row_l = []
    nodewise_weight_l = []

    for r in results_l:

        params_mn += r[0]
        grad_mn += r[1]


    nodewise_row_l = np.array(nodewise_row_l)
    nodewise_weight_l = np.array(nodewise_weight_l)

    params_mn /= partitions
    grad_mn *= -1. / partitions

    approx_inv_cov = _calc_approx_inv_cov(nodewise_row_l, nodewise_weight_l)

    debiased_params = params_mn +

    debiased_params[np.abs(debiased_params) < threshold] = 0

    return debiased_params

def _helper_fit_partition(self, pnum, endog, exog, fit_kwds,
    """handles the model fitting for each machine. NOTE: this
    is primarily handled outside of DistributedModel because
    joblib cannot handle class methods.

    self : DistributedModel class instance
        An instance of DistributedModel.
    pnum : scalar
        index of current partition.
    endog : array_like
        endogenous data for current partition.
    exog : array_like
        exogenous data for current partition.
    fit_kwds : dict-like
        Keywords needed for the model fitting.
    init_kwds_e : dict-like
        Additional init_kwds to add for each partition.

    estimation_method result.  For the default,
    _est_regularized_debiased, a tuple.

    temp_init_kwds = self.init_kwds.copy()

    model = self.model_class(endog, exog, **temp_init_kwds)
    results = self.estimation_method(model, pnum, self.partitions,
    return results

[docs]class DistributedModel(object): __doc__ = """ Distributed model class Parameters ---------- partitions : scalar The number of partitions that the data will be split into. model_class : statsmodels model class The model class which will be used for estimation. If None this defaults to OLS. init_kwds : dict-like or None Keywords needed for initializing the model, in addition to endog and exog. init_kwds_generator : generator or None Additional keyword generator that produces model init_kwds that may vary based on data partition. The current usecase is for WLS and GLS estimation_method : function or None The method that performs the estimation for each partition. If None this defaults to _est_regularized_debiased. estimation_kwds : dict-like or None Keywords to be passed to estimation_method. join_method : function or None The method used to recombine the results from each partition. If None this defaults to _join_debiased. join_kwds : dict-like or None Keywords to be passed to join_method. results_class : results class or None The class of results that should be returned. If None this defaults to RegularizedResults. results_kwds : dict-like or None Keywords to be passed to results class. Attributes ---------- partitions : scalar See Parameters. model_class : statsmodels model class See Parameters. init_kwds : dict-like See Parameters. init_kwds_generator : generator or None See Parameters. estimation_method : function See Parameters. estimation_kwds : dict-like See Parameters. join_method : function See Parameters. join_kwds : dict-like See Parameters. results_class : results class See Parameters. results_kwds : dict-like See Parameters. Notes ----- Examples -------- """ def __init__(self, partitions, model_class=None, init_kwds=None, estimation_method=None, estimation_kwds=None, join_method=None, join_kwds=None, results_class=None, results_kwds=None): self.partitions = partitions if model_class is None: self.model_class = OLS else: self.model_class = model_class if init_kwds is None: self.init_kwds = {} else: self.init_kwds = init_kwds if estimation_method is None: self.estimation_method = _est_regularized_debiased else: self.estimation_method = estimation_method if estimation_kwds is None: self.estimation_kwds = {} else: self.estimation_kwds = estimation_kwds if join_method is None: self.join_method = _join_debiased else: self.join_method = join_method if join_kwds is None: self.join_kwds = {} else: self.join_kwds = join_kwds if results_class is None: self.results_class = RegularizedResults else: self.results_class = results_class if results_kwds is None: self.results_kwds = {} else: self.results_kwds = results_kwds
[docs] def fit(self, data_generator, fit_kwds=None, parallel_method="sequential", parallel_backend=None, init_kwds_generator=None): """Performs the distributed estimation using the corresponding DistributedModel Parameters ---------- data_generator : generator A generator that produces a sequence of tuples where the first element in the tuple corresponds to an endog array and the element corresponds to an exog array. fit_kwds : dict-like or None Keywords needed for the model fitting. parallel_method : str type of distributed estimation to be used, currently "sequential", "joblib" and "dask" are supported. parallel_backend : None or joblib parallel_backend object used to allow support for more complicated backends, ex: dask.distributed init_kwds_generator : generator or None Additional keyword generator that produces model init_kwds that may vary based on data partition. The current usecase is for WLS and GLS Returns ------- join_method result. For the default, _join_debiased, it returns a p length array. """ if fit_kwds is None: fit_kwds = {} if parallel_method == "sequential": results_l = self.fit_sequential(data_generator, fit_kwds, init_kwds_generator) elif parallel_method == "joblib": results_l = self.fit_joblib(data_generator, fit_kwds, parallel_backend, init_kwds_generator) else: raise ValueError("parallel_method: %s is currently not supported" % parallel_method) params = self.join_method(results_l, **self.join_kwds) # NOTE that currently, the dummy result model that is initialized # here does not use any init_kwds from the init_kwds_generator event # if it is provided. It is possible to imagine an edge case where # this might be a problem but given that the results model instance # does not correspond to any data partition this seems reasonable. res_mod = self.model_class([0], [0], **self.init_kwds) return self.results_class(res_mod, params, **self.results_kwds)
[docs] def fit_sequential(self, data_generator, fit_kwds, init_kwds_generator=None): """Sequentially performs the distributed estimation using the corresponding DistributedModel Parameters ---------- data_generator : generator A generator that produces a sequence of tuples where the first element in the tuple corresponds to an endog array and the element corresponds to an exog array. fit_kwds : dict-like Keywords needed for the model fitting. init_kwds_generator : generator or None Additional keyword generator that produces model init_kwds that may vary based on data partition. The current usecase is for WLS and GLS Returns ------- join_method result. For the default, _join_debiased, it returns a p length array. """ results_l = [] if init_kwds_generator is None: for pnum, (endog, exog) in enumerate(data_generator): results = _helper_fit_partition(self, pnum, endog, exog, fit_kwds) results_l.append(results) else: tup_gen = enumerate(zip(data_generator, init_kwds_generator)) for pnum, ((endog, exog), init_kwds_e) in tup_gen: results = _helper_fit_partition(self, pnum, endog, exog, fit_kwds, init_kwds_e) results_l.append(results) return results_l
[docs] def fit_joblib(self, data_generator, fit_kwds, parallel_backend, init_kwds_generator=None): """Performs the distributed estimation in parallel using joblib Parameters ---------- data_generator : generator A generator that produces a sequence of tuples where the first element in the tuple corresponds to an endog array and the element corresponds to an exog array. fit_kwds : dict-like Keywords needed for the model fitting. parallel_backend : None or joblib parallel_backend object used to allow support for more complicated backends, ex: dask.distributed init_kwds_generator : generator or None Additional keyword generator that produces model init_kwds that may vary based on data partition. The current usecase is for WLS and GLS Returns ------- join_method result. For the default, _join_debiased, it returns a p length array. """ from import parallel_func par, f, n_jobs = parallel_func(_helper_fit_partition, self.partitions) if parallel_backend is None and init_kwds_generator is None: results_l = par(f(self, pnum, endog, exog, fit_kwds) for pnum, (endog, exog) in enumerate(data_generator)) elif parallel_backend is not None and init_kwds_generator is None: with parallel_backend: results_l = par(f(self, pnum, endog, exog, fit_kwds) for pnum, (endog, exog) in enumerate(data_generator)) elif parallel_backend is None and init_kwds_generator is not None: tup_gen = enumerate(zip(data_generator, init_kwds_generator)) results_l = par(f(self, pnum, endog, exog, fit_kwds, init_kwds) for pnum, ((endog, exog), init_kwds) in tup_gen) elif parallel_backend is not None and init_kwds_generator is not None: tup_gen = enumerate(zip(data_generator, init_kwds_generator)) with parallel_backend: results_l = par(f(self, pnum, endog, exog, fit_kwds, init_kwds) for pnum, ((endog, exog), init_kwds) in tup_gen) return results_l
[docs]class DistributedResults(LikelihoodModelResults): """ Class to contain model results Parameters ---------- model : class instance Class instance for model used for distributed data, this particular instance uses fake data and is really only to allow use of methods like predict. params : ndarray Parameter estimates from the fit model. """ def __init__(self, model, params): super(DistributedResults, self).__init__(model, params)
[docs] def predict(self, exog, *args, **kwargs): """Calls self.model.predict for the provided exog. See Results.predict. Parameters ---------- exog : array_like NOT optional The values for which we want to predict, unlike standard predict this is NOT optional since the data in self.model is fake. *args : Some models can take additional arguments. See the predict method of the model for the details. **kwargs : Some models can take additional keywords arguments. See the predict method of the model for the details. Returns ------- prediction : ndarray, pandas.Series or pandas.DataFrame See self.model.predict """ return self.model.predict(self.params, exog, *args, **kwargs)