Source code for regpyhdfe.regpyhdfe

import pandas as pd
import pyhdfe
import statsmodels.api as sm
import numpy as np
import pyhdfe
from .utils import add_intercept, get_np_columns
from patsy import dmatrices

[docs]class Regpyhdfe:
[docs] def __init__(self, df, target, predictors, absorb_ids=[], cluster_ids=[], drop_singletons=True, intercept=False): """Regression wrapper for PyHDFE. Args: df (pandas Dataframe): dataframe containing referenced data which includes target, predictors and absorb and cluster. target (string): name of target variable - the y in y = X*b + e. predictors (string or list of strings): names of predictors, the X in y = X*b + e. absorb_ids (string or list of strings): names of variables to be absorbed for fixed effects. cluster_ids (string or list of strings): names of variables to be clustered on. drop_singletons (bool): indicates whether to drop singleton groups. Defaults is True, same as stata. Setting to False is equivalent to passing keepsingletons to reghdfe. """ self.df = df # in case user has not wrapped singular strings in a list if isinstance(predictors, str): predictors = [predictors] if isinstance(absorb_ids, str): absorb_ids = [absorb_ids] if isinstance(cluster_ids, str): cluster_ids = [cluster_ids] self.target = target self.predictors = predictors self.absorb_ids = absorb_ids self.cluster_ids = cluster_ids self.drop_singletons = drop_singletons # names of all features involved in regression self.all_names = [target]+predictors # We construct a formula here to feed it into OLS # We do this to make the output prettier and give each coefficient # meaningful names (otherwise the regression coefficients are named x1, x2 etc.) self.formula = target + '~' + predictors[0] for name in predictors[1:]: self.formula = self.formula + '+' + name # if there's stuff to be absorbed if absorb_ids: # Intercept term is redundant in fixed effects self.formula = self.formula + '-1' self.algo = pyhdfe.create(ids=get_np_columns(df, absorb_ids), cluster_ids=get_np_columns(df, cluster_ids), drop_singletons=drop_singletons, degrees_method='pairwise') # self.residualized contains features adjusted for fixed effects # (i.e. means subtracted, singleton groups dropped etc.) self.data = self.algo.residualize(get_np_columns(df, [target]+predictors)) else: # otherwise just get np columns as is self.data = get_np_columns(df, self.all_names) if not intercept: self.formula = self.formula + '-1' df_data = pd.DataFrame() for i, name in enumerate(self.all_names): df_data[name] = self.data[:,i] y, X = dmatrices(self.formula, data=df_data, return_type='dataframe') # now We prepare the cluster groups if they exist # We do this here rather than later in fit() since it can be convenient to have # access to these groups as early as possible # if not empty self.model = sm.OLS(y, X) if bool(self.cluster_ids): # numpy array of group data self.groups_np = get_np_columns(self.df, self.cluster_ids) if bool(self.absorb_ids): # number of groups - already calculated by pyhdfe so We're just retrieving the value n_groups = self.algo._groups_list[0].group_count # get numpy representation of cluster groups # and remove singleton groups (if fixed effects were used) self.groups_np = self.groups_np[~self.algo._singleton_indices] min_cluster_count = np.unique(self.groups_np[:,0]).shape[0] for i in range(1, self.groups_np.shape[1]): current_count = np.unique(self.groups_np[:,i]).shape[0] if current_count < min_cluster_count: min_cluster_count = current_count self.min_cluster_count = min_cluster_count self.model.df_resid = min_cluster_count - len(self.predictors)+1 self.model._df_resid = self.model.df_resid
[docs] def fit(self): """Generate linear regression coefficients for given data. The regression will cluster on variables provided during initialization. Returns: statsmodels.regression.linear_model.RegressionResults. """ if bool(self.cluster_ids): self.res = self.model.fit(cov_type='cluster', use_t=False, cov_kwds={'df_correction':True, 'groups':self.groups_np}) # manually adjusting degrees of freedom of residuals #res.df_resid_inference = res.df_resid self.res.summary = lambda yname=None, xname=None, title=None, alpha=.05:summary(self.res._results, self, yname=None, xname=None, title=None, alpha=.05) return self.res else: # is stuff has been absorbed adjust degrees of freedom if bool(self.absorb_ids and self.drop_singletons): a = 1234 import pdb; pdb.set_trace() self.model.df_resid = np.sum(~self.algo._singleton_indices)-len(self.predictors)-self.algo.degrees return self.model.fit()
[docs]def summary(self, regpyhdfe, yname=None, xname=None, title=None, alpha=.05): """ Summarize the Regression Results. Parameters ---------- yname : str, optional Name of endogenous (response) variable. The Default is `y`. xname : list[str], optional Names for the exogenous variables. Default is `var_##` for ## in the number of regressors. Must match the number of parameters in the model. title : str, optional Title for the top table. If not None, then this replaces the default title. alpha : float The significance level for the confidence intervals. Returns ------- Summary Instance holding the summary tables and text, which can be printed or converted to various output formats. See Also -------- statsmodels.iolib.summary.Summary : A class that holds summary results. """ ########################################################################################################## ########################################################################################################## # https://apithymaxim.wordpress.com/2020/03/16/clustering-standard-errors-by-hand-using-python/ # http://cameron.econ.ucdavis.edu/research/Cameron_Miller_JHR_2015_February.pdf #N,k,Nclusts = len(df.index),3,50 # Number of observations, right hand side columns counting constant, number of clusters #X = np.hstack( (np.random.random((N,k-1)), np.ones((N,1)) ) ) #X = get_np_columns(df, ['wks_ue', 'tenure'], intercept=True) X = regpyhdfe.data[:,1:] #y = get_np_columns(df, ['ttl_exp']) y = np.expand_dims(regpyhdfe.data[:,0], 1) # Calculate (X'X)^-1 and the vector of coefficients, beta XX_inv = np.linalg.inv(X.T.dot(X)) beta = (XX_inv).dot(X.T.dot(y)) resid = y - X.dot(beta) #ID = np.random.choice([x for x in range(Nclusts)],N) # Vector of cluster IDs #ID = np.squeeze(get_np_columns(df, ['delete_me'])) ID = np.squeeze(regpyhdfe.groups_np) c_list = np.unique(ID) # Get unique list of clusters N, k, Nclusts = X.shape[0], X.shape[1], int(c_list.shape[0]) sum_XuuTX = 0 for c in range(0,Nclusts): in_cluster = (ID==c_list[c]) # Indicator for given cluster value resid_c = resid[in_cluster] uuT = resid_c.dot(resid_c.T) Xc = X[in_cluster] XuuTX = Xc.T.dot(uuT).dot(Xc) sum_XuuTX += XuuTX adj = (Nclusts/(Nclusts-1))*((N-1)/(N-k)) # Degrees of freedom correction from https://www.stata.com/manuals13/u20.pdf p. 54 # TODO: actually check if the fixed effects are nested df_a_nested = 1 adj = ((N-1)/(N-df_a_nested-k))*(Nclusts/(Nclusts-1)) V_beta = adj*(XX_inv.dot(sum_XuuTX).dot(XX_inv)) se_beta = np.sqrt(np.diag(V_beta)) # Output data for Stata for_stata = pd.DataFrame(X) for_stata.columns = ["X" + str(i) for i in range(k)] for_stata['ID'] = ID for_stata['y'] = y ##for_stata.to_stata("resid_test.dta") print('B', beta,'\n SE: \n', se_beta) beta = np.squeeze(beta) t_values = beta/se_beta print('T values', t_values) from scipy.stats import t p_values = 2*t.cdf(-np.abs(t_values), regpyhdfe.model.df_resid) # confidence interval size t_interval = np.asarray(t.interval(alpha=(1-alpha), df=regpyhdfe.model.df_resid)) print("t_interval", t_interval) intervals = np.empty(shape=(beta.shape[0], 2)) # for each variables for i in range(0, intervals.shape[0]): intervals[i] = t_interval*se_beta[i] + beta[i] print('intervals', intervals) tmp1 = np.linalg.solve(V_beta, np.mat(beta).T) tmp2 = np.dot(np.mat(beta), tmp1) fvalue = tmp2[0, 0] / k import pdb; pdb.set_trace() print('fvalue', fvalue) # from statsmodels.stats.stattools import ( # jarque_bera, omni_normtest, durbin_watson) # jb, jbpv, skew, kurtosis = jarque_bera(self.wresid) # omni, omnipv = omni_normtest(self.wresid) # eigvals = self.eigenvals # condno = self.condition_number # TODO: Avoid adding attributes in non-__init__ # self.diagn = dict(jb=jb, jbpv=jbpv, skew=skew, kurtosis=kurtosis, # omni=omni, omnipv=omnipv, condno=condno, # mineigval=eigvals[-1]) # TODO not used yet # diagn_left_header = ['Models stats'] # diagn_right_header = ['Residual stats'] # TODO: requiring list/iterable is a bit annoying # need more control over formatting # TODO: default do not work if it's not identically spelled top_left = [('Dep. Variable:', None), ('Model:', None), ('Method:', ['Least Squares']), ('Date:', None), ('Time:', None), ('No. Observations:', None), ('Df Residuals:', None), ('Df Model:', None), ] if hasattr(self, 'cov_type'): top_left.append(('Covariance Type:', [self.cov_type])) rsquared_type = '' if self.k_constant else ' (uncentered)' top_right = [('R-squared' + rsquared_type + ':', ["%#8.3f" % self.rsquared]), ('Adj. R-squared' + rsquared_type + ':', ["%#8.3f" % self.rsquared_adj]), ('F-statistic:', ["%#8.4g" % self.fvalue]), ('Prob (F-statistic):', ["%#6.3g" % self.f_pvalue]), ] # diagn_left = [('Omnibus:', ["%#6.3f" % omni]), # ('Prob(Omnibus):', ["%#6.3f" % omnipv]), # ('Skew:', ["%#6.3f" % skew]), # ('Kurtosis:', ["%#6.3f" % kurtosis]) # ] # # diagn_right = [('Durbin-Watson:', # ["%#8.3f" % durbin_watson(self.wresid)] # ), # ('Jarque-Bera (JB):', ["%#8.3f" % jb]), # ('Prob(JB):', ["%#8.3g" % jbpv]), # ] if title is None: title = self.model.__class__.__name__ + ' ' + "Regression Results" # create summary table instance from statsmodels.iolib.summary import Summary smry = Summary() smry.add_table_2cols(self, gleft=top_left, gright=top_right, yname=yname, xname=xname, title=title) smry.add_table_params(self, yname=yname, xname=xname, alpha=alpha, use_t=self.use_t) # smry.add_table_2cols(self, gleft=diagn_left, gright=diagn_right, # yname=yname, xname=xname, # title="") # add warnings/notes, added to text format only etext = [] if not self.k_constant: etext.append( "R² is computed without centering (uncentered) since the " "model does not contain a constant." ) if hasattr(self, 'cov_type'): etext.append(self.cov_kwds['description']) if self.model.exog.shape[0] < self.model.exog.shape[1]: wstr = "The input rank is higher than the number of observations." etext.append(wstr) # if eigvals[-1] < 1e-10: # wstr = "The smallest eigenvalue is %6.3g. This might indicate " # wstr += "that there are\n" # wstr += "strong multicollinearity problems or that the design " # wstr += "matrix is singular." # wstr = wstr % eigvals[-1] # etext.append(wstr) # elif condno > 1000: # TODO: what is recommended? # wstr = "The condition number is large, %6.3g. This might " # wstr += "indicate that there are\n" # wstr += "strong multicollinearity or other numerical " # wstr += "problems." # wstr = wstr % condno # etext.append(wstr) if etext: etext = ["[{0}] {1}".format(i + 1, text) for i, text in enumerate(etext)] etext.insert(0, "Notes:") smry.add_extra_txt(etext) return smry