"""
Classes and functions related to computing fairness evaluations.
:author: Anastassia Loukina (aloukina@ets.org)
:author: Jeremy Biggs (jbiggs@ets.org)
:author: Nitin Madnani (nmadnani@ets.org)
:organization: ETS
"""
import pickle
import warnings
from os.path import join
from typing import Dict, Optional, Tuple
import pandas as pd
import statsmodels.formula.api as smf
from statsmodels.regression.linear_model import RegressionResults
from statsmodels.stats.anova import anova_lm
from rsmtool.container import DataContainer, DatasetDict
from rsmtool.writer import DataWriter
def convert_to_ordered_category(
group_values: pd.Series, base_group: Optional[str] = None
) -> pd.Series:
"""
Convert given series to an ordered category.
The levels are ordered by category size. If multiple
categories have the same size, the order is determined
alphabetically.
Parameters
----------
group_values : pandas.Series
A series indicating group membership
base_group : Optional[str]
The group to use as the first category which overrides the default ordering.
Defaults to ``None``.
Returns
-------
group_ordered_category : pandas.Series
The ordered category.
Raises
------
ValueError
If ``base_group`` is specified but does not exist in the data.
"""
# get ordered list by size
# To get the ordered list by size, we convert the value counts to data
# frame to allow for multilevel sorting. This ensures that the order
# is consistent and reproducible across runs when there is more than
# one group with the maximum number of occurrences
df_groups_by_size = pd.DataFrame(group_values.value_counts()).reset_index()
df_groups_by_size.columns = ["group_name", "count"]
df_groups_by_size_sorted = df_groups_by_size.sort_values(
["count", "group_name"], ascending=[False, True]
)
groups_by_size = df_groups_by_size_sorted["group_name"].tolist()
if base_group is not None:
# if we have user-supplied base group, check that it's actually in the data
if base_group not in group_values.values:
raise ValueError(
f"The reference group {base_group} must be one of the "
f"existing values for this group"
)
else:
# move the supplied reference group to the beginning of the list
base_index = groups_by_size.index(base_group)
groups_by_size.insert(0, groups_by_size.pop(base_index))
# convert to category and reorder
group_category = group_values.astype("category")
group_ordered_category = group_category.cat.reorder_categories(groups_by_size, ordered=True)
return group_ordered_category
def get_coefficients(fit: RegressionResults, base_category: str) -> pd.DataFrame:
"""
Extract estimates, significance, and confidence intervals for a given group.
The names of the predictors are processed to remove the
prefix added by ``statmodels``. The name of the base category
is added in parenthesis to the Intercept.
Parameters
----------
fit: statsmodels.regression.linear_model.RegressionResults
Linear regression object fitted using ``statsmodels``.
base_category: str
Name of the group used as reference category when fitting the model.
Returns
-------
df_results: pandas.DataFrame
A dataframe with rows for each category and the following columns:
- "estimate"
- "P>[t]"
- "0.025" (lower end for 95% confidence interval)
- "0.975" (upper end of 95% confidence interval)
"""
# extract the data we need
df_results = pd.concat([fit.params, fit.pvalues, fit.conf_int()], axis=1)
df_results.columns = ["estimate", "P>[t]", "[0.025", "0.975]"]
# identify the rows we are interested in
groups = ["Intercept"] + [v for v in df_results.index if "group" in v]
df_results = df_results.loc[groups]
# rename the rows
df_results.index = [
v.split(".")[1].strip("]") if not v == "Intercept" else f"Intercept ({base_category})"
for v in df_results.index
]
return df_results
def write_fairness_results(
fit_dictionary: Dict[str, RegressionResults],
fairness_container: DataContainer,
group: str,
output_dir: str,
experiment_id: str,
file_format: str,
) -> None:
"""
Save the results of fairness analysis to disk.
Parameters
----------
fit_dictionary: Dict[str, RegressionResults]
A dictionary of fitted models generated by ``get_fairness_analyses()``.
fairness_container: DataContainer
A data container with the results of fairness analysis generated by
``get_fairness_analyses()``.
group: str
The subgroup considered in this analysis.
output_dir: str
The directory where the results will be saved.
experiment_id: str
The experiment ID.
file_format: str
File format to use for data files.
"""
# let's first save model files and summaries
for model in fit_dictionary:
fit = fit_dictionary[model]
ols_file = join(output_dir, f"{experiment_id}_{model}_by_{group}.ols")
summary_file = join(output_dir, f"{experiment_id}_{model}_by_{group}_ols_summary.txt")
with open(ols_file, "wb") as olsf, open(summary_file, "w") as summf:
pickle.dump(fit, olsf)
summf.write(str(fit.summary()))
# Now let's write out the content of the data container
writer = DataWriter(experiment_id)
writer.write_experiment_output(
output_dir, fairness_container, file_format=file_format, index=True
)
[docs]
def get_fairness_analyses(
df: pd.DataFrame,
group: str,
system_score_column: str,
human_score_column: str = "sc1",
base_group: Optional[str] = None,
) -> Tuple[Dict[str, RegressionResults], DataContainer]:
"""
Compute analyses from `Loukina et al. 2019 <https://aclanthology.org/W19-4401/>`_.
The function computes how much variance group membership explains in
overall score accuracy (osa), overall score difference (osd),
and conditional score difference (csd). See the paper for more
details.
Parameters
----------
df: pandas.DataFrame
A dataframe containing columns with numeric human scores,
columns with numeric system scores and a column with
group membership.
group: str
Name of the column containing group membership.
system_score_column: str
Name of the column containing system scores.
human_score_column: str
Name of the column containing human scores.
Dedaults to ``"sc1"``.
base_group: Optional[str]
Name of the group to use as the reference category. If ``None``, the
group with the largest number of cases will be used as the reference
category. Ties are broken alphabetically.
Defaults to ``None``.
Returns
-------
model_dict: Dict[str, RegressionResults]
A dictionary with different proposed metrics as keys and fitted models
as values.
fairness_container: DataContainer
A datacontainer with the following datasets:
- ``"estimates_<METRIC>_by_<GROUP>"`` where ``<GROUP>`` corresponds
to the given group and ``<METRIC>`` can be ``osa``, ``osd`` and ``csd``
estimates for each group computed by the respective models.
- ``"fairness_metrics_by_<GROUP>"`` - a summary of model fits (R^2 and
p values).
"""
# compute error and squared error
df["error"] = df[system_score_column] - df[human_score_column]
df["SE"] = df["error"] ** 2
# convert group values to category and reorder them using
# the largest category as reference
df["group"] = convert_to_ordered_category(df[group], base_group=base_group)
base_category = df["group"].cat.categories[0]
df["sc1_cat"] = convert_to_ordered_category(df[human_score_column])
# Overall score accuracy (OSA)
# Variance in squared error explained by L1
# fit the model
osa_model = smf.ols(formula="SE ~ group", data=df)
osa_fit = osa_model.fit()
# collect the results
osa_dict = {"R2": osa_fit.rsquared_adj, "sig": osa_fit.f_pvalue}
osa_results = pd.Series(osa_dict, name="Overall score accuracy")
df_coefficients_osa = get_coefficients(osa_fit, base_category)
# Overall score difference (OSD)
# variance in signed residuals (raw error) explained by L1
# fit the model
osd_model = smf.ols(formula="error ~ group", data=df)
osd_fit = osd_model.fit()
# collect the results
osd_dict = {"R2": osd_fit.rsquared_adj, "sig": osd_fit.f_pvalue}
osd_results = pd.Series(osd_dict, name="Overall score difference")
df_coefficients_osd = get_coefficients(osd_fit, base_category)
# conditional score difference CSD
# Variance in score difference conditioned on Native language
# fit "null" model with human score only
csd_null_mod = smf.ols(formula="error ~ sc1_cat", data=df)
csd_null_fit = csd_null_mod.fit()
# fit model with both human score and group
csd_mod = smf.ols(formula="error ~ group + sc1_cat", data=df)
csd_fit = csd_mod.fit()
# compare the two models using anova_lm
# we filter warnings for this function because we get
# runtime warning due to NaNs in the data.
# these seem to be by design: https://groups.google.com/forum/#!topic/pystatsmodels/-flY0cNnb3k
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
anova_results = anova_lm(csd_null_fit, csd_fit)
# collect the results. Note that R2 in this case is a difference
# in R2 between the two models and significance is obtained from anova
csd_dict = {
"R2": csd_fit.rsquared_adj - csd_null_fit.rsquared_adj,
"sig": anova_results.values[1][-1],
}
csd_results = pd.Series(csd_dict, name="Conditional score difference")
df_coefficients_csd = get_coefficients(csd_fit, base_category)
# create a summary table
df_r2_all = pd.concat([osa_results, osd_results, csd_results], axis=1, sort=True)
df_r2_all["base_category"] = base_category
# assemble all datasets into a DataContainer
datasets = [
DatasetDict({"name": f"estimates_osa_by_{group}", "frame": df_coefficients_osa}),
DatasetDict({"name": f"estimates_osd_by_{group}", "frame": df_coefficients_osd}),
DatasetDict({"name": f"estimates_csd_by_{group}", "frame": df_coefficients_csd}),
DatasetDict({"name": f"fairness_metrics_by_{group}", "frame": df_r2_all}),
]
# assemble all models into a dictionary
model_dict = {"osa": osa_fit, "osd": osd_fit, "csd": csd_fit}
return model_dict, DataContainer(datasets=datasets)