Source code for statsmodels.stats.anova

from statsmodels.compat.python import lrange

import numpy as np
import pandas as pd
from pandas import DataFrame, Index
from scipy import stats

from statsmodels.formula._manager import FormulaManager
from statsmodels.iolib import summary2
from statsmodels.regression.linear_model import OLS


def _get_covariance(model, robust):
    if robust is None:
        return model.cov_params()
    elif robust == "hc0":
        return model.cov_HC0
    elif robust == "hc1":
        return model.cov_HC1
    elif robust == "hc2":
        return model.cov_HC2
    elif robust == "hc3":
        return model.cov_HC3
    else:  # pragma: no cover
        raise ValueError("robust options %s not understood" % robust)


# NOTE: these need to take into account weights !

def anova_single(model, **kwargs):
    """
    Anova table for one fitted linear model.

    Parameters
    ----------
    model : fitted linear model results instance
        A fitted linear model
    typ : int or str {1,2,3} or {"I","II","III"}
        Type of sum of squares to use.

    **kwargs**

    scale : float
        Estimate of variance, If None, will be estimated from the largest
    model. Default is None.
        test : str {"F", "Chisq", "Cp"} or None
        Test statistics to provide. Default is "F".

    Notes
    -----
    Use of this function is discouraged. Use anova_lm instead.
    """
    test = kwargs.get("test", "F")
    typ = kwargs.get("typ", 1)
    robust = kwargs.get("robust", None)
    if robust:
        robust = robust.lower()

    endog = model.model.endog
    exog = model.model.exog
    nobs = exog.shape[0]

    model_spec = model.model.data.model_spec
    # +1 for resids
    mgr = FormulaManager()
    n_rows = (len(model_spec.terms) - mgr.has_intercept(model_spec) + 1)

    pr_test = "PR(>%s)" % test
    names = ['df', 'sum_sq', 'mean_sq', test, pr_test]

    table = DataFrame(np.zeros((n_rows, 5)), columns=names)

    if typ in [1, "I"]:
        return anova1_lm_single(model, endog, exog, nobs, model_spec, table,
                                n_rows, test, pr_test, robust)
    elif typ in [2, "II"]:
        return anova2_lm_single(model, model_spec, n_rows, test, pr_test,
                                robust)
    elif typ in [3, "III"]:
        return anova3_lm_single(model, model_spec, n_rows, test, pr_test,
                                robust)
    elif typ in [4, "IV"]:
        raise NotImplementedError("Type IV not yet implemented")
    else:  # pragma: no cover
        raise ValueError("Type %s not understood" % str(typ))


def anova1_lm_single(model, endog, exog, nobs, model_spec, table, n_rows, test,
                     pr_test, robust):
    """
    Anova table for one fitted linear model.

    Parameters
    ----------
    model : fitted linear model results instance
        A fitted linear model

    **kwargs**

    scale : float
        Estimate of variance, If None, will be estimated from the largest
    model. Default is None.
        test : str {"F", "Chisq", "Cp"} or None
        Test statistics to provide. Default is "F".

    Notes
    -----
    Use of this function is discouraged. Use anova_lm instead.
    """
    #maybe we should rethink using pinv > qr in OLS/linear models?
    mgr = FormulaManager()
    effects = getattr(model, 'effects', None)
    if effects is None:
        q,r = np.linalg.qr(exog)
        effects = np.dot(q.T, endog)

    arr = np.zeros((len(model_spec.terms), len(model_spec.column_names)))
    slices = [
        mgr.get_slice(model_spec, name) for name in mgr.get_term_names(model_spec)
    ]
    for i, slice_ in enumerate(slices):
        arr[i, slice_] = 1

    sum_sq = np.dot(arr, effects**2)
    #NOTE: assumes intercept is first column
    mgr = FormulaManager()
    idx = mgr.intercept_idx(model_spec)
    sum_sq = sum_sq[~idx]
    term_names = np.array(mgr.get_term_names(model_spec)) # want boolean indexing
    term_names = term_names[~idx]

    index = term_names.tolist()
    table.index = Index(index + ['Residual'])
    table.loc[index, ['df', 'sum_sq']] = np.c_[arr[~idx].sum(1), sum_sq]
    # fill in residual
    table.loc['Residual', ['sum_sq','df']] = model.ssr, model.df_resid
    if test == 'F':
        table[test] = ((table['sum_sq'] / table['df']) /
                       (model.ssr / model.df_resid))
        table[pr_test] = stats.f.sf(table["F"], table["df"],
                                    model.df_resid)
        table.loc['Residual', [test, pr_test]] = np.nan, np.nan
    table['mean_sq'] = table['sum_sq'] / table['df']
    return table

#NOTE: the below is not agnostic about formula...
def anova2_lm_single(model, model_spec, n_rows, test, pr_test, robust):
    """
    Anova type II table for one fitted linear model.

    Parameters
    ----------
    model : fitted linear model results instance
        A fitted linear model

    **kwargs**

    scale : float
        Estimate of variance, If None, will be estimated from the largest
    model. Default is None.
        test : str {"F", "Chisq", "Cp"} or None
        Test statistics to provide. Default is "F".

    Notes
    -----
    Use of this function is discouraged. Use anova_lm instead.

    Type II
    Sum of Squares compares marginal contribution of terms. Thus, it is
    not particularly useful for models with significant interaction terms.
    """
    mgr = FormulaManager()
    terms_info = model_spec.terms[:] # copy
    terms_info = mgr.remove_intercept(terms_info)

    names = ['sum_sq', 'df', test, pr_test]

    table = DataFrame(np.zeros((n_rows, 4)), columns = names)
    robust_cov = _get_covariance(model, robust)
    col_order = []
    index = []
    for i, term in enumerate(terms_info):
        # grab all variables except interaction effects that contain term
        # need two hypotheses matrices L1 is most restrictive, ie., term==0
        # L2 is everything except term==0
        cols = mgr.get_slice(model_spec, term)
        L1 = lrange(cols.start, cols.stop)
        L2 = []
        term_set = set(term.factors)
        for t in terms_info: # for the term you have
            other_set = set(t.factors)
            if term_set.issubset(other_set) and not term_set == other_set:
                col = mgr.get_slice(model_spec, t)
                # on a higher order term containing current `term`
                L1.extend(lrange(col.start, col.stop))
                L2.extend(lrange(col.start, col.stop))

        L1 = np.eye(model.model.exog.shape[1])[L1]
        L2 = np.eye(model.model.exog.shape[1])[L2]

        if L2.size:
            LVL = np.dot(np.dot(L1,robust_cov),L2.T)
            from scipy import linalg
            orth_compl,_ = linalg.qr(LVL)
            r = L1.shape[0] - L2.shape[0]
            # L1|2
            # use the non-unique orthogonal completion since L12 is rank r
            L12 = np.dot(orth_compl[:,-r:].T, L1)
        else:
            L12 = L1
            r = L1.shape[0]
        #from IPython.core.debugger import Pdb; Pdb().set_trace()
        if test == 'F':
            f = model.f_test(L12, cov_p=robust_cov)
            table.loc[table.index[i], test] = f.fvalue
            table.loc[table.index[i], pr_test] = f.pvalue

        # need to back out SSR from f_test
        table.loc[table.index[i], 'df'] = r
        col_order.append(cols.start)
        index.append(mgr.get_term_name(term))

    table.index = Index(index + ['Residual'])
    table = table.iloc[np.argsort(col_order + [model.model.exog.shape[1]+1])]
    # back out sum of squares from f_test
    ssr = table[test] * table['df'] * model.ssr/model.df_resid
    table['sum_sq'] = ssr
    # fill in residual
    table.loc['Residual', ['sum_sq','df', test, pr_test]] = (model.ssr,
                                                            model.df_resid,
                                                            np.nan, np.nan)

    return table

def anova3_lm_single(model, model_spec, n_rows, test, pr_test, robust):
    mgr = FormulaManager()
    n_rows += mgr.has_intercept(model_spec)
    terms_info = model_spec.terms

    names = ['sum_sq', 'df', test, pr_test]

    table = DataFrame(np.zeros((n_rows, 4)), columns = names)
    cov = _get_covariance(model, robust)
    index = []
    for i, term in enumerate(terms_info):
        # grab term, hypothesis is that term == 0
        cols = mgr.get_slice(model_spec, term)
        L1 = np.eye(model.model.exog.shape[1])[cols]
        L12 = L1
        r = L1.shape[0]

        if test == 'F':
            f = model.f_test(L12, cov_p=cov)
            table.loc[table.index[i], test] = f.fvalue
            table.loc[table.index[i], pr_test] = f.pvalue

        # need to back out SSR from f_test
        table.loc[table.index[i], 'df'] = r
        #col_order.append(cols.start)
        index.append(mgr.get_term_name(term))

    table.index = Index(index + ['Residual'])
    #NOTE: Do not need to sort because terms are an ordered dict now
    #table = table.iloc[np.argsort(col_order + [model.model.exog.shape[1]+1])]
    # back out sum of squares from f_test
    ssr = table[test] * table['df'] * model.ssr/model.df_resid
    table['sum_sq'] = ssr
    # fill in residual
    table.loc['Residual', ['sum_sq','df', test, pr_test]] = (model.ssr,
                                                            model.df_resid,
                                                            np.nan, np.nan)
    return table

[docs] def anova_lm(*args, **kwargs): """ Anova table for one or more fitted linear models. Parameters ---------- args : fitted linear model results instance One or more fitted linear models scale : float Estimate of variance, If None, will be estimated from the largest model. Default is None. test : str {"F", "Chisq", "Cp"} or None Test statistics to provide. Default is "F". typ : str or int {"I","II","III"} or {1,2,3} The type of Anova test to perform. See notes. robust : {None, "hc0", "hc1", "hc2", "hc3"} Use heteroscedasticity-corrected coefficient covariance matrix. If robust covariance is desired, it is recommended to use `hc3`. Returns ------- anova : DataFrame When args is a single model, return is DataFrame with columns: sum_sq : float64 Sum of squares for model terms. df : float64 Degrees of freedom for model terms. F : float64 F statistic value for significance of adding model terms. PR(>F) : float64 P-value for significance of adding model terms. When args is multiple models, return is DataFrame with columns: df_resid : float64 Degrees of freedom of residuals in models. ssr : float64 Sum of squares of residuals in models. df_diff : float64 Degrees of freedom difference from previous model in args ss_dff : float64 Difference in ssr from previous model in args F : float64 F statistic comparing to previous model in args PR(>F): float64 P-value for significance comparing to previous model in args Notes ----- Model statistics are given in the order of args. Models must have been fit using the formula api. See Also -------- model_results.compare_f_test, model_results.compare_lm_test Examples -------- >>> import statsmodels.api as sm >>> from statsmodels.formula.api import ols >>> moore = sm.datasets.get_rdataset("Moore", "carData", cache=True) # load >>> data = moore.data >>> data = data.rename(columns={"partner.status" : ... "partner_status"}) # make name pythonic >>> moore_lm = ols('conformity ~ C(fcategory, Sum)*C(partner_status, Sum)', ... data=data).fit() >>> table = sm.stats.anova_lm(moore_lm, typ=2) # Type 2 Anova DataFrame >>> print(table) """ typ = kwargs.get('typ', 1) ### Farm Out Single model Anova Type I, II, III, and IV ### if len(args) == 1: model = args[0] return anova_single(model, **kwargs) if typ not in [1, "I"]: raise ValueError("Multiple models only supported for type I. " "Got type %s" % str(typ)) test = kwargs.get("test", "F") scale = kwargs.get("scale", None) n_models = len(args) pr_test = "Pr(>%s)" % test names = ['df_resid', 'ssr', 'df_diff', 'ss_diff', test, pr_test] table = DataFrame(np.zeros((n_models, 6)), columns=names) if not scale: # assume biggest model is last scale = args[-1].scale table["ssr"] = [mdl.ssr for mdl in args] table["df_resid"] = [mdl.df_resid for mdl in args] table.loc[table.index[1:], "df_diff"] = -np.diff(table["df_resid"].values) table["ss_diff"] = -table["ssr"].diff() if test == "F": table["F"] = table["ss_diff"] / table["df_diff"] / scale table[pr_test] = stats.f.sf(table["F"], table["df_diff"], table["df_resid"]) # for earlier scipy - stats.f.sf(np.nan, 10, 2) -> 0 not nan table.loc[table['F'].isnull(), pr_test] = np.nan return table
def _not_slice(slices, slices_to_exclude, n): ind = np.array([True]*n) for term in slices_to_exclude: s = slices[term] ind[s] = False return ind def _ssr_reduced_model(y, x, term_slices, params, keys): """ Residual sum of squares of OLS model excluding factors in `keys` Assumes x matrix is orthogonal Parameters ---------- y : array_like dependent variable x : array_like independent variables term_slices : a dict of slices term_slices[key] is a boolean array specifies the parameters associated with the factor `key` params : ndarray OLS solution of y = x * params keys : keys for term_slices factors to be excluded Returns ------- rss : float residual sum of squares df : int degrees of freedom """ ind = _not_slice(term_slices, keys, x.shape[1]) params1 = params[ind] ssr = np.subtract(y, x[:, ind].dot(params1)) ssr = ssr.T.dot(ssr) df_resid = len(y) - len(params1) return ssr, df_resid
[docs] class AnovaRM: """ Repeated measures Anova using least squares regression The full model regression residual sum of squares is used to compare with the reduced model for calculating the within-subject effect sum of squares [1]. Currently, only fully balanced within-subject designs are supported. Calculation of between-subject effects and corrections for violation of sphericity are not yet implemented. Parameters ---------- data : DataFrame depvar : str The dependent variable in `data` subject : str Specify the subject id within : list[str] The within-subject factors between : list[str] The between-subject factors, this is not yet implemented aggregate_func : {None, 'mean', callable} If the data set contains more than a single observation per subject and cell of the specified model, this function will be used to aggregate the data before running the Anova. `None` (the default) will not perform any aggregation; 'mean' is s shortcut to `numpy.mean`. An exception will be raised if aggregation is required, but no aggregation function was specified. Returns ------- results : AnovaResults instance Raises ------ ValueError If the data need to be aggregated, but `aggregate_func` was not specified. Notes ----- This implementation currently only supports fully balanced designs. If the data contain more than one observation per subject and cell of the design, these observations need to be aggregated into a single observation before the Anova is calculated, either manually or by passing an aggregation function via the `aggregate_func` keyword argument. Note that if the input data set was not balanced before performing the aggregation, the implied heteroscedasticity of the data is ignored. References ---------- .. [*] Rutherford, Andrew. Anova and ANCOVA: a GLM approach. John Wiley & Sons, 2011. """ def __init__(self, data, depvar, subject, within=None, between=None, aggregate_func=None): self.data = data self.depvar = depvar self.within = within if 'C' in within: raise ValueError("Factor name cannot be 'C'! This is in conflict " "with patsy's contrast function name.") self.between = between if between is not None: raise NotImplementedError('Between subject effect not ' 'yet supported!') self.subject = subject if aggregate_func == 'mean': self.aggregate_func = pd.Series.mean else: self.aggregate_func = aggregate_func if not data.equals(data.drop_duplicates(subset=[subject] + within)): if self.aggregate_func is not None: self._aggregate() else: msg = ('The data set contains more than one observation per ' 'subject and cell. Either aggregate the data manually, ' 'or pass the `aggregate_func` parameter.') raise ValueError(msg) self._check_data_balanced() def _aggregate(self): self.data = (self.data .groupby([self.subject] + self.within, as_index=False)[self.depvar] .agg(self.aggregate_func)) def _check_data_balanced(self): """raise if data is not balanced This raises a ValueError if the data is not balanced, and returns None if it is balance Return might change """ factor_levels = 1 for wi in self.within: factor_levels *= len(self.data[wi].unique()) cell_count = {} for index in range(self.data.shape[0]): key = [] for col in self.within: key.append(self.data[col].iloc[index]) key = tuple(key) if key in cell_count: cell_count[key] = cell_count[key] + 1 else: cell_count[key] = 1 error_message = "Data is unbalanced." if len(cell_count) != factor_levels: raise ValueError(error_message) count = cell_count[key] for key in cell_count: if count != cell_count[key]: raise ValueError(error_message) if self.data.shape[0] > count * factor_levels: raise ValueError('There are more than 1 element in a cell! Missing' ' factors?')
[docs] def fit(self): """estimate the model and compute the Anova table Returns ------- AnovaResults instance """ y = self.data[self.depvar].values # Construct OLS endog and exog from string using patsy within = ['C(%s, Sum)' % i for i in self.within] subject = 'C(%s, Sum)' % self.subject factors = within + [subject] mgr = FormulaManager() x = mgr.get_matrices('*'.join(factors), data=self.data, pandas=False) term_slices = mgr.get_term_name_slices(x) for key in term_slices: ind = np.array([False]*x.shape[1]) ind[term_slices[key]] = True term_slices[key] = np.array(ind) term_exclude = [':'.join(factors)] ind = _not_slice(term_slices, term_exclude, x.shape[1]) x = x[:, ind] # Fit OLS model = OLS(y, x) results = model.fit() if model.rank < x.shape[1]: raise ValueError('Independent variables are collinear.') for i in term_exclude: term_slices.pop(i) for key in term_slices: term_slices[key] = term_slices[key][ind] params = results.params df_resid = results.df_resid ssr = results.ssr columns = ['F Value', 'Num DF', 'Den DF', 'Pr > F'] anova_table = pd.DataFrame(np.zeros((0, 4)), columns=columns) for key in term_slices: if self.subject not in str(key) and str(key) not in ('Intercept', "1"): # Independent variables are orthogonal ssr1, df_resid1 = _ssr_reduced_model( y, x, term_slices, params, [key]) df1 = df_resid1 - df_resid msm = (ssr1 - ssr) / df1 if (str(key) == ':'.join(factors[:-1]) or (str(key) + ':' + subject not in term_slices)): mse = ssr / df_resid df2 = df_resid else: ssr1, df_resid1 = _ssr_reduced_model( y, x, term_slices, params, [str(key) + ':' + subject]) df2 = df_resid1 - df_resid mse = (ssr1 - ssr) / df2 F = msm / mse p = stats.f.sf(F, df1, df2) term = str(key).replace('C(', '').replace(', Sum)', '') anova_table.loc[term, 'F Value'] = F anova_table.loc[term, 'Num DF'] = df1 anova_table.loc[term, 'Den DF'] = df2 anova_table.loc[term, 'Pr > F'] = p return AnovaResults(anova_table)
class AnovaResults: """ Anova results class Attributes ---------- anova_table : DataFrame """ def __init__(self, anova_table): self.anova_table = anova_table def __str__(self): return self.summary().__str__() def summary(self): """create summary results Returns ------- summary : summary2.Summary instance """ summ = summary2.Summary() summ.add_title('Anova') summ.add_df(self.anova_table) return summ if __name__ == "__main__": import pandas from statsmodels.formula.api import ols # in R #library(car) #write.csv(Moore, "moore.csv", row.names=FALSE) moore = pandas.read_csv('moore.csv', skiprows=1, names=['partner_status','conformity', 'fcategory','fscore']) moore_lm = ols('conformity ~ C(fcategory, Sum)*C(partner_status, Sum)', data=moore).fit() mooreB = ols('conformity ~ C(partner_status, Sum)', data=moore).fit() # for each term you just want to test vs the model without its # higher-order terms # using Monette-Fox slides and Marden class notes for linear algebra / # orthogonal complement # https://netfiles.uiuc.edu/jimarden/www/Classes/STAT324/ table = anova_lm(moore_lm, typ=2)

Last update: Jan 20, 2025