"""
Utility functions for computing various RSMTool metrics.
:author: Jeremy Biggs (jbiggs@ets.org)
:author: Anastassia Loukina (aloukina@ets.org)
:author: Nitin Madnani (nmadnani@ets.org)
:organization: ETS
"""
import warnings
from typing import List, Optional
import numpy as np
import pandas as pd
from skll.data import FeatureSet
from skll.learner import Learner
[docs]
def compute_expected_scores_from_model(
model: Learner, featureset: FeatureSet, min_score: float, max_score: float
) -> np.ndarray:
"""
Compute expected scores using probability distributions over labels.
This function only works with SKLL models.
Parameters
----------
model : skll.learner.Learner
The SKLL learner object to use for computing the expected scores.
featureset : skll.data.FeatureSet
The SKLL featureset object for which predictions are to be made.
min_score : float
Minimum score level to be used for computing expected scores.
max_score : float
Maximum score level to be used for computing expected scores.
Returns
-------
expected_scores: numpy.ndarray
A numpy array containing the expected scores.
Raises
------
ValueError
If the given model cannot predict probability distributions.
ValueError
If the score range specified by ``min_score`` and ``max_score``
does not match what the model predicts in its probability
distribution.
"""
max_score = int(max_score)
min_score = int(min_score)
if hasattr(model.model, "predict_proba"):
# Tell the model we want probabiltiies as output. This is likely already set
# to True but it might not be, e.g., when using rsmpredict.
model.probability = True
probability_distributions = model.predict(featureset, class_labels=False)
# check to make sure that the number of labels in the probability
# distributions matches the number of score points we have
num_score_points_specified = max_score - min_score + 1
num_score_points_in_learner = probability_distributions.shape[1]
if num_score_points_specified != num_score_points_in_learner:
raise ValueError(
f"The specified number of score points ({num_score_points_specified}) "
f"does not match that from the learner ({num_score_points_in_learner})."
)
expected_scores = probability_distributions.dot(range(min_score, max_score + 1))
else:
if model.model_type.__name__ == "SVC":
raise ValueError(
"Expected scores cannot be computed since the SVC model was "
"not originally trained to predict probabilities."
)
else:
raise ValueError(
f"Expected scores cannot be computed since "
f"{model.model_type.__name__} is not a probabilistic classifier."
)
return expected_scores
def covariance_to_correlation(cov: np.ndarray) -> np.ndarray:
"""
Implement the R ``cov2cor`` function in Python.
Parameters
----------
cov : numpy.ndarray
The covariance matrix.
Returns
-------
retval : numpy.ndarray
The cross-correlation matrix.
Raises
------
ValueError
If the input matrix is not square.
"""
# make sure the matrix is square
numrows, numcols = cov.shape
if not numrows == numcols:
raise ValueError("Input matrix must be square")
Is = np.sqrt(1 / np.diag(cov))
retval = Is * cov * np.repeat(Is, numrows).reshape(numrows, numrows)
np.fill_diagonal(retval, 1.0)
return retval
[docs]
def partial_correlations(df: pd.DataFrame) -> pd.DataFrame:
"""
Implement the R ``pcor`` function from ``ppcor`` package in Python.
This computes partial correlations of each pair of variables
in the given data frame ``df``, excluding all other variables.
Parameters
----------
df : pandas.DataFrame
Data frame containing the feature values.
Returns
-------
df_pcor : pandas.DataFrame
Data frame containing the partial correlations of of each
pair of variables in the given data frame ``df``,
excluding all other variables.
"""
numrows, numcols = df.shape
df_cov = df.cov()
columns = df_cov.columns
# return a matrix of nans if the number of columns is
# greater than the number of rows. When the ncol == nrows
# we get the degenerate matrix with 1 only. It is not meaningful
# to compute partial correlations when ncol > nrows.
# create empty array for when we cannot compute the
# matrix inversion
empty_array = np.empty((len(columns), len(columns)))
empty_array[:] = np.nan
if numcols > numrows:
icvx = empty_array
else:
# if the determinant is less than the lowest representable
# 32 bit integer, then we use the pseudo-inverse;
# otherwise, use the inverse; if a linear algebra error
# occurs, then we just set the matrix to empty
try:
assert np.linalg.det(df_cov) > np.finfo(np.float32).eps
icvx = np.linalg.inv(df_cov)
except AssertionError:
icvx = np.linalg.pinv(df_cov)
warnings.warn(
"When computing partial correlations "
"the inverse of the variance-covariance matrix "
"was calculated using the Moore-Penrose generalized "
"matrix inversion, due to its determinant being at "
"or very close to zero."
)
except np.linalg.LinAlgError:
icvx = empty_array
pcor = -1 * covariance_to_correlation(icvx)
np.fill_diagonal(pcor, 1.0)
df_pcor = pd.DataFrame(pcor, columns=columns, index=columns)
return df_pcor
[docs]
def agreement(score1: List[int], score2: List[int], tolerance: int = 0) -> float:
"""
Compute the agreement between two raters, under given tolerance.
Parameters
----------
score1 : List[int]
List of rater 1 scores
score2 : List[int]
List of rater 2 scores
tolerance : int
Difference in scores that is acceptable.
Defaults to 0.
Returns
-------
agreement_value : float
The percentage agreement between the two scores.
"""
# make sure the two sets of scores
# are for the same number of items
assert len(score1) == len(score2)
num_agreements = sum([int(abs(s1 - s2) <= tolerance) for s1, s2 in zip(score1, score2)])
agreement_value = (float(num_agreements) / len(score1)) * 100
return agreement_value
[docs]
def standardized_mean_difference(
y_true_observed: np.ndarray,
y_pred: np.ndarray,
population_y_true_observed_sd: Optional[float] = None,
population_y_pred_sd: Optional[float] = None,
method: str = "unpooled",
ddof: int = 1,
) -> float:
"""
Compute the standardized mean difference between system and human scores.
The numerator is calculated as mean(y_pred) - mean(y_true_observed)
for all of the available methods.
Parameters
----------
y_true_observed : numpy.ndarray
The observed scores for the group or subgroup.
y_pred : numpy.ndarray
The predicted scores for the group or subgroup.
population_y_true_observed_sd : Optional[float]
The population true score standard deviation. When the SMD is being
calculated for a subgroup, this should be the standard deviation for
the whole population.
Defaults to ``None``.
population_y_pred_sd : Optional[float]
The predicted score standard deviation. When the SMD is being calculated
for a subgroup, this should be the standard deviation for the whole population.
Defaults to ``None``.
method : str
The SMD method to use. Possible options are:
- ``"williamson"``: Denominator is the pooled population standard deviation
of ``y_true_observed`` and ``y_pred`` computed using
``population_y_true_observed_sd`` and ``population_y_pred_sd``.
- ``"johnson"``: Denominator is ``population_y_true_observed_sd``.
- "pooled": Denominator is the pooled standard deviation of
``y_true_observed`` and ``y_pred`` for this group.
- ``"unpooled"``: Denominator is the standard deviation of
``y_true_observed`` for this group.
Defaults to ``"unpooled"``.
ddof : int
The delta degrees of freedom. The divisor used in calculations is N - ddof,
where N represents the number of elements.
Defaults to 1.
Returns
-------
smd : float
The SMD for the given group or subgroup.
Raises
------
ValueError
If method is "williamson" and either ``population_y_true_observed_sd``
or ``population_y_pred_sd`` is ``None``.
ValueError
If method is "johnson" and ``population_y_true_observed_sd`` is ``None``.
ValueError
If method is not one of {"unpooled", "pooled", "williamson", "johnson"}.
Note
----
- The "williamson" implementation was recommended by Williamson,
et al. (2012).
- The metric is only applicable when both sets of scores are on
the same scale.
"""
numerator = np.mean(y_pred) - np.mean(y_true_observed)
method = method.lower()
if method == "unpooled":
denominator = np.std(y_true_observed, ddof=ddof)
elif method == "pooled":
denominator = np.sqrt(
(np.std(y_true_observed, ddof=ddof) ** 2 + np.std(y_pred, ddof=ddof) ** 2) / 2
)
elif method == "johnson":
if population_y_true_observed_sd is None:
raise ValueError(
"If `method='johnson'`, then `population_y_true_observed_sd` " "must be provided."
)
denominator = population_y_true_observed_sd
elif method == "williamson":
if population_y_true_observed_sd is None or population_y_pred_sd is None:
raise ValueError(
"If `method='williamson'`, both `population_y_true_observed_sd` "
"and `population_y_pred_sd` must be provided."
)
denominator = np.sqrt((population_y_true_observed_sd**2 + population_y_pred_sd**2) / 2)
else:
possible_methods = {"'unpooled'", "'pooled'", "'johnson'", "'williamson'"}
raise ValueError(
f"The available methods are {{{', '.join(possible_methods)}}}; you selected {method}."
)
# if the denominator is zero, then return NaN as the SMD
smd = np.nan if denominator == 0 else numerator / denominator
return smd
[docs]
def difference_of_standardized_means(
y_true_observed: np.ndarray,
y_pred: np.ndarray,
population_y_true_observed_mn: Optional[float] = None,
population_y_pred_mn: Optional[float] = None,
population_y_true_observed_sd: Optional[float] = None,
population_y_pred_sd: Optional[float] = None,
ddof: int = 1,
) -> Optional[float]:
"""
Calculate the difference between standardized means.
First, standardize both observed and predicted scores to z-scores using
mean and standard deviation for the whole population. Then
calculate differences between standardized means for each subgroup.
Parameters
----------
y_true_observed : numpy.ndarray
The observed scores for the group or subgroup.
y_pred : numpy.ndarray
The predicted scores for the group or subgroup.
population_y_true_observed_mn : Optional[float]
The population true score mean. When the DSM is being calculated for a
subgroup, this should be the mean for the whole population.
Defaults to ``None``.
population_y_pred_mn : Optional[float]
The predicted score mean. When the DSM is being calculated for a subgroup,
this should be the mean for the whole population.
Defaults to ``None``.
population_y_true_observed_sd : Optional[float]
The population true score standard deviation. When the DSM is being
calculated for a subgroup, this should be the standard deviation for the
whole population.
Defaults to ``None``.
population_y_pred_sd : Optional[float]
The predicted score standard deviation. When the DSM is being calculated
for a subgroup, this should be the standard deviation for the whole population.
Defaults to ``None``.
ddof : int
The delta degrees of freedom. The divisor used in calculations is N - ddof
where N represents the number of elements.
Defaults to 1.
Returns
-------
difference_of_std_means : Optional[float]
The difference of standardized means.
Raises
------
ValueError
If only one of ``population_y_true_observed_mn`` and
``population_y_true_observed_sd`` is not ``None``.
ValueError
If only one of ``population_y_pred_mn`` and ``population_y_pred_sd``
is not ``None``.
"""
assert len(y_true_observed) == len(y_pred)
# all of this is just to make sure users aren't passing the population
# standard deviation and not population mean for either true or predicted
y_true_observed_population_params = [
population_y_true_observed_mn,
population_y_true_observed_sd,
]
y_pred_population_params = [population_y_pred_mn, population_y_pred_sd]
if len([param for param in y_true_observed_population_params if param is None]) == 1:
raise ValueError(
"You must pass both `population_y_true_observed_mn` and "
"`population_y_true_observed_sd` or neither."
)
if len([param for param in y_pred_population_params if param is None]) == 1:
raise ValueError(
"You must pass both `population_y_pred_mn` and " "`population_y_pred_sd` or neither."
)
warning_msg = (
"You did not pass population mean and std. for `{}`; "
"thus, the calculated z-scores will be zero."
)
# if the population means and standard deviations were not provided, calculate from the data
# We only check for mean since the function requires
# both of these to be set or both to be None
if population_y_true_observed_mn is None:
warnings.warn(warning_msg.format("y_true_observed"))
(population_y_true_observed_sd, population_y_true_observed_mn) = (
np.std(y_true_observed, ddof=ddof),
np.mean(y_true_observed),
)
if population_y_pred_mn is None:
warnings.warn(warning_msg.format("y_pred"))
(population_y_pred_sd, population_y_pred_mn) = (
np.std(y_pred, ddof=ddof),
np.mean(y_pred),
)
# if any of the standard deviations equal zero
# raise a warning and return None.
# We use np.isclose since sometimes sd for float
# values is a value very close to 0.
# We use the same tolerance as used for identifying
# features with zero standard deviation
if np.isclose(population_y_pred_sd, 0, atol=1e-07) or np.isclose(
population_y_true_observed_sd, 0, atol=1e-07
):
warnings.warn(
"Population standard deviations for the computation of "
"DSM is zero. No value will be computed."
)
return None
# calculate the z-scores for observed and predicted
y_true_observed_subgroup_z = (
y_true_observed - population_y_true_observed_mn
) / population_y_true_observed_sd
y_pred_subgroup_z = (y_pred - population_y_pred_mn) / population_y_pred_sd
# calculate the DSM, given the z-scores for observed and predicted
difference_of_std_means = np.mean(y_pred_subgroup_z - y_true_observed_subgroup_z)
return difference_of_std_means
[docs]
def quadratic_weighted_kappa(
y_true_observed: np.ndarray, y_pred: np.ndarray, ddof: int = 0
) -> float:
r"""
Calculate quadratic-weighted kappa for both discrete and continuous values.
The formula to compute quadratic-weighted kappa for continuous values
was developed at ETS by Shelby Haberman.
See `Haberman (2019) <https://eric.ed.gov/?q=Measures+of+Agreement+Versus
+Measures+of+Prediction+Accuracy&id=EJ1238497>`_
for the full derivation. The discrete case is simply treated as a
special case of the continuous one.
The formula is as follows:
:math:`QWK=\\displaystyle\\frac{2*Cov(M,H)}{Var(H)+Var(M)+(\\bar{M}-\\bar{H})^2}`, where
- :math:`Cov` - covariance with normalization by :math:`N` (total number of observations)
- :math:`H` - the human score
- :math:`M` - the system score
- :math:`\\bar{H}` - mean of :math:`H`
- :math:`\\bar{M}` - mean of :math:`M`
- :math:`Var(X)` - variance of X
Parameters
----------
y_true_observed : numpy.ndarray
The observed scores.
y_pred : numpy.ndarray
The predicted scores.
ddof : int
Means Delta Degrees of Freedom. The divisor used in
calculations is N - ddof, where N represents the
number of elements. When ddof is set to zero, the results
for discrete case match those from the standard implementations.
Defaults to 0.
Returns
-------
kappa : float
The quadratic weighted kappa
Raises
------
AssertionError
If the number of elements in ``y_true_observed`` is not equal
to the number of elements in ``y_pred``.
"""
assert len(y_true_observed) == len(y_pred)
y_true_observed_var, y_true_observed_avg = (
np.var(y_true_observed, ddof=ddof),
np.mean(y_true_observed),
)
y_pred_var, y_pred_avg = (np.var(y_pred, ddof=ddof), np.mean(y_pred))
numerator = 2 * np.cov(y_true_observed, y_pred, ddof=ddof)[0][1]
denominator = y_true_observed_var + y_pred_var + (y_true_observed_avg - y_pred_avg) ** 2
kappa = numerator / denominator
return kappa