#!/usr/bin/env python
"""
Generate predictions on new data from rsmtool models.
:author: Jeremy Biggs (jbiggs@ets.org)
:author: Anastassia Loukina (aloukina@ets.org)
:author: Nitin Madnani (nmadnani@ets.org)
:organization: ETS
"""
import glob
import logging
import os
import sys
from os.path import abspath, basename, dirname, exists, join, normpath, split, splitext
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
from wandb.sdk.lib import RunDisabled
from wandb.wandb_run import Run
from .configuration_parser import Configuration, configure
from .modeler import Modeler
from .preprocessor import FeaturePreprocessor
from .reader import DataReader
from .utils.commandline import CmdOption, ConfigurationGenerator, setup_rsmcmd_parser
from .utils.constants import VALID_PARSER_SUBCOMMANDS
from .utils.logging import LogFormatter
from .utils.wandb import init_wandb_run, log_configuration_to_wandb
from .writer import DataWriter
[docs]
def fast_predict(
input_features: Dict[str, float],
modeler: Modeler,
df_feature_info: Optional[pd.DataFrame] = None,
trim: bool = False,
trim_min: Optional[float] = None,
trim_max: Optional[float] = None,
trim_tolerance: Optional[float] = None,
scale: bool = False,
train_predictions_mean: Optional[float] = None,
train_predictions_sd: Optional[float] = None,
h1_mean: Optional[float] = None,
h1_sd: Optional[float] = None,
logger: Optional[logging.Logger] = None,
) -> Dict[str, float]:
"""
Compute predictions for a single instance against given model.
The main difference between this function and the ``compute_and_save_predictions()``
function is that the former is meant for batch prediction and reads
all its inputs from disk and writes its outputs to disk. This function,
however, is meant for real-time inference rather than batch. To this end,
it operates *entirely* in memory. Note that there is still a bit of overlap
between the two computation paths since we want to use the RSMTool API
as much as possible.
This function should only be used when the goal is to generate predictions
using RSMTool models in production. The user should read everything from disk
in a separate thread/function and pass the inputs to this function.
Note that this function only computes regular predictions, not expected
scores.
Parameters
----------
input_features : Dict[str, float]
A dictionary containing the features for the instance for which to
generate the model predictions. The keys should be names of the
features on which the model was trained and the values should
be the *raw* feature values.
modeler : Modeler
The RSMTool ``Modeler`` object from which the predictions are to be
generated. This object should be created from the already existing
``.model`` file in the "output" directory of the previously run
RSMTool experiment.
df_feature_info : Optional[pandas.DataFrame]
If ``None``, this function will try to extract this information
from ``modeler``.
A DataFrame containing the information regarding the model features.
The index of the dataframe should be the names of the features and
the columns should be:
- "sign" : 1 or -1. Indicates whether the feature value needs to
be multiplied by -1.
- "transform" : :ref:`transformation <select_transformations_rsmtool>`
that needs to be applied to this feature.
- "train_mean", "train_sd" : mean and standard deviation for outlier
truncation.
- "train_transformed_mean", "train_transformed_sd" : mean and standard
deviation for computing z-scores.
This dataframe should be read from the "feature.csv" file under the
"output" directory of the previously run RSMTool experiment.
Defaults to ``None``.
trim : bool
Whether to trim the predictions. If ``True``, ``trim_min`` and
``trim_max`` must be specified or be available as attributes of
the ``modeler``.
Defaults to ``False``.
trim_min : Optional[float]
The lowest possible score that the machine should predict.
If ``None``, this function will try to extract this value from
``modeler``. If ``None``, no such attribute exists, and
``trim=True``, a ``ValueError`` will be raised.
Defaults to ``None``.
trim_max : Optional[float]
The highest possible score that the machine should predict.
If ``None``, this function will try to extract this value from
``modeler``. If ``None``, no such attribute exists, and
``trim=True``, a ``ValueError`` will be raised.
Defaults to ``None``.
trim_tolerance : Optional[float]
The single numeric value that will be used to pad the trimming range
specified in ``trim_min`` and ``trim_max``. If ``None``, this function
will try to extract this value from ``modeler``. If no such attribute
can be found, the value will default to ``0.4998``.
Defaults to ``None``.
scale : bool
Whether to scale predictions. If ``True``, all of
``train_predictions_mean``, ``train_predictions_sd``, ``h1_mean``,
and ``h1_sd`` must be specified or be available as attributes of
``modeler``.
Defaults to ``False``.
train_predictions_mean : Optional[float]
The mean of the predictions on the training set used to re-scale the
predictions. May be read from the "postprocessing_params.csv" file
under the "output" directory of the RSMTool experiment used to train
the model. If ``None``, this function will try to extract this value
from ``modeler``. If ``None``, no such attribute exists, and
``scale=True``, a ``ValueError`` will be raised.
Defaults to ``None``.
train_predictions_sd : Optional[float]
The standard deviation of the predictions on the training set used to
re-scale the predictions. May be read from the "postprocessing_params.csv"
file under the "output" directory of the RSMTool experiment used to train
the model. If ``None``, this function will try to extract this value from
``modeler``. If ``None`` and no such attribute exists, predictions will
not be scaled.
Defaults to ``None``.
h1_mean : Optional[float]
The mean of the human scores in the training set also used to re-scale
the predictions. May be read from the "postprocessing_params.csv" file
under the "output" directory of the RSMTool experiment used to train
the model. If ``None``, this function will try to extract this value from
``modeler``. If ``None``, no such attribute exists, and
``scale=True``, a ``ValueError`` will be raised.
Defaults to ``None``.
h1_sd : Optional[float]
The standard deviation of the human scores in the training set used to
re-scale the predictions. May be read from the "postprocessing_params.csv"
file under the "output" directory of the RSMTool experiment used to train
the model. If ``None``, this function will try to extract this value from
``modeler``. If ``None``, no such attribute exists, and
``scale=True``, a ``ValueError`` will be raised.
Defaults to ``None``.
logger : Optional[logging.Logger]
A Logger object. If ``None`` is passed, get logger from ``__name__``.
Defaults to ``None``.
Returns
-------
Dict[str, float]
A dictionary containing the raw, scaled, trimmed, and rounded
predictions for the input features. It always contains the
"raw" key and may contain the following additional keys depending
on the availability of the various optional arguments:
"raw_trim", "raw_trim_round", "scale", "scale_trim",
and "scale_trim_round".
Raises
------
ValueError
If ``input_features`` contains any non-numeric features
ValueError
If trimming/scaling is turned on but related parameters are either
not specified or cannot be found as attributes in ``modeler``/have
a value of ``None``
ValueError
If trimming/scaling-related parameters are specified but
trimming/scaling is turned off
ValueError
If feature information is either not specified or cannot be found
as an attribute in ``modeler``/has a value of ``None``
"""
# initialize a logger if none provided
logger = logger if logger else logging.getLogger(__name__)
# instantiate a feature preprocessor
preprocessor = FeaturePreprocessor(logger=logger)
# convert the given features to a data frame and add the "spkitemid" column
df_input_features = pd.DataFrame([input_features])
df_input_features["spkitemid"] = "RESPONSE"
feature_info_error_message = (
"'df_feature_info' must be specified if it not found as an attribute in the "
"modeler object with a value that is not ``None``"
)
if df_feature_info is None:
try:
stored_feature_info = modeler.feature_info
assert stored_feature_info is not None
except (AttributeError, AssertionError):
raise ValueError(feature_info_error_message) from None
else:
df_feature_info = stored_feature_info
# preprocess the input features so that they match what the model expects
try:
df_processed_features, _ = preprocessor.preprocess_new_data(
df_input_features, df_feature_info
)
except ValueError:
raise ValueError("Input features must not contain non-numeric values.") from None
# now compute the raw prediction for the given features
df_predictions = modeler.predict(df_processed_features)
# compute scaled predictions if requested
if scale:
scale_args_error_message = (
"When 'scale' is set to True and no explicit values are provided, the "
"'train_predictions_mean', 'train_predictions_sd', 'h1_mean', and 'h1_sd' "
"modeler attributes must be present and not ``None``."
)
try:
if train_predictions_mean is None:
train_predictions_mean = modeler.train_predictions_mean
if train_predictions_sd is None:
train_predictions_sd = modeler.train_predictions_sd
if h1_mean is None:
h1_mean = modeler.h1_mean
if h1_sd is None:
h1_sd = modeler.h1_sd
if any(
arg is None
for arg in [train_predictions_mean, train_predictions_sd, h1_mean, h1_sd]
):
raise ValueError(scale_args_error_message) from None
except AttributeError:
raise ValueError(scale_args_error_message) from None
df_predictions["scale"] = (
(df_predictions["raw"] - train_predictions_mean) / train_predictions_sd
) * h1_sd + h1_mean
elif any(
arg is not None for arg in [train_predictions_mean, train_predictions_sd, h1_mean, h1_sd]
):
raise ValueError(
"train_predictions_mean/train_predictions_sd/h1_mean/h1_sd cannot be "
"specified when scale=False"
) from None
# drop the spkitemid column since it's not needed from this point onwards
df_predictions.drop("spkitemid", axis="columns", inplace=True)
# trim both raw and scaled predictions if requested
if trim:
trim_args_error_message = (
"When 'trim' is set to ``True`` and no explicit values are provided, the "
"'trim_min' and 'trim_max' modeler attributes must be present and not "
"``None``."
)
# get the value of the trim tolerance if not provided
default_trim_tolerance = 0.4998
if trim_tolerance is None:
try:
trim_tolerance = modeler.trim_tolerance
# if the attribute is not present, use default value
except AttributeError:
trim_tolerance = default_trim_tolerance
# if it is present but has a value of None, also use default value
else:
if trim_tolerance is None:
trim_tolerance = default_trim_tolerance
# get the values of the trim min and max if not provided
try:
if trim_min is None:
trim_min = modeler.trim_min
if trim_max is None:
trim_max = modeler.trim_max
# raise an error if the attributes are not present
except AttributeError:
raise ValueError(trim_args_error_message) from None
# if they are present but have a value of None, also raise an error
else:
if trim_min is None or trim_max is None:
raise ValueError(trim_args_error_message) from None
for column in df_predictions.columns:
df_predictions[f"{column}_trim"] = preprocessor.trim(
df_predictions[column], trim_min, trim_max, trim_tolerance
)
df_predictions[f"{column}_trim_round"] = np.rint(
df_predictions[f"{column}_trim"]
).astype("int64")
elif any(arg is not None for arg in [trim_tolerance, trim_min, trim_max]):
raise ValueError(
"trim_tolerance/trim_min/trim_max cannot be specified when trim=False"
) from None
# return the predictions as a dictionary
return df_predictions.to_dict(orient="records")[0]
[docs]
def compute_and_save_predictions(
config_file_or_obj_or_dict: Union[str, Configuration, Dict[str, Any], Path],
output_file: str,
feats_file: Optional[str] = None,
logger: Optional[logging.Logger] = None,
wandb_run: Union[Run, RunDisabled, None] = None,
) -> None:
"""
Run rsmpredict using the given configuration.
Generate predictions using given configuration file, object, or
dictionary. Predictions are saved in ``output_file``. Optionally,
pre-processed feature values are saved in ``feats_file``,
if specified.
Parameters
----------
config_file_or_obj_or_dict : Union[str, Configuration, Dict[str, Any], Path]
Path to the experiment configuration file either a a string
or as a ``pathlib.Path`` object. Users can also pass a
``Configuration`` object that is in memory or a Python dictionary
with keys corresponding to fields in the configuration file. Given a
configuration file, any relative paths in the configuration file
will be interpreted relative to the location of the file. Given a
``Configuration`` object, relative paths will be interpreted
relative to the ``configdir`` attribute, that _must_ be set. Given
a dictionary, the reference path is set to the current directory.
output_file : str
The path to the output file.
feats_file : Optional[str]
Path to the output file for saving preprocessed feature values.
logger : Optional[logging.Logger]
A Logger object. If ``None`` is passed, get logger from ``__name__``.
Defaults to ``None``.
wandb_run : Union[wandb.wandb_run.Run, wandb.sdk.lib.RunDisabled, None]
A wandb run object that will be used to log artifacts and tables.
If ``None`` is passed, a new wandb run will be initialized if
wandb is enabled in the configuration.
Defaults to ``None``.
Raises
------
FileNotFoundError
If any of the files contained in ``config_file_or_obj_or_dict``
cannot be located.
FileNotFoundError
If ``experiment_dir`` does not exist.
FileNotFoundError
If ``experiment_dir`` does not contain the required output
needed from an rsmtool experiment.
RuntimeError
If the name of the output file does not end in
".csv", ".tsv", or ".xlsx".
"""
logger = logger if logger else logging.getLogger(__name__)
configuration = configure("rsmpredict", config_file_or_obj_or_dict)
# get the experiment ID
experiment_id = configuration["experiment_id"]
# Get output format
file_format = configuration.get("file_format", "csv")
# If wandb logging is enabled, and wandb_run is not provided,
# start a wandb run and log configuration
if wandb_run is None:
wandb_run = init_wandb_run(configuration)
log_configuration_to_wandb(wandb_run, configuration)
# Get DataWriter object
writer = DataWriter(experiment_id, configuration.context, wandb_run)
# get the input file containing the feature values
# for which we want to generate the predictions
input_features_file = DataReader.locate_files(
configuration["input_features_file"], configuration.configdir
)[0]
if not input_features_file:
raise FileNotFoundError(f"Input file {configuration['input_features_file']} does not exist")
experiment_dir = DataReader.locate_files(
configuration["experiment_dir"], configuration.configdir
)[0]
if not experiment_dir:
raise FileNotFoundError(f"The directory {configuration['experiment_dir']} does not exist.")
else:
experiment_output_dir = normpath(join(experiment_dir, "output"))
if not exists(experiment_output_dir):
raise FileNotFoundError(
f"The directory {experiment_dir} does not contain "
f"the output of an rsmtool experiment."
)
# find all the .model files in the experiment output directory
model_files = glob.glob(join(experiment_output_dir, "*.model"))
if not model_files:
raise FileNotFoundError(
f"The directory {experiment_output_dir} does not contain any rsmtool models."
)
experiment_ids = [splitext(basename(mf))[0] for mf in model_files]
if experiment_id not in experiment_ids:
raise FileNotFoundError(
f"{experiment_output_dir} does not contain a model "
f'for the experiment "{experiment_id}". The following '
f"experiments are contained in this directory: {experiment_ids}"
)
# check that the directory contains outher required files
required_file_types = ["feature", "postprocessing_params"]
for file_type in required_file_types:
expected_file_name = f"{experiment_id}_{file_type}.csv"
if not exists(join(experiment_output_dir, expected_file_name)):
raise FileNotFoundError(
f"{experiment_output_dir} does not contain the "
f"required file {expected_file_name} that was "
f"generated during the original model training."
)
logger.info("Reading input files.")
feature_info = join(experiment_output_dir, f"{experiment_id}_feature.csv")
post_processing = join(experiment_output_dir, f"{experiment_id}_postprocessing_params.csv")
file_paths = [input_features_file, feature_info, post_processing]
file_names = ["input_features", "feature_info", "postprocessing_params"]
converters = {"input_features": configuration.get_default_converter()}
# Initialize the reader
reader = DataReader(file_paths, file_names, converters)
data_container = reader.read(kwargs_dict={"feature_info": {"index_col": 0}})
# load the Modeler to generate the predictions
model = Modeler.load_from_file(join(experiment_output_dir, f"{experiment_id}.model"))
# Add the model to the configuration object
configuration["model"] = model
# Initialize the processor
processor = FeaturePreprocessor(logger=logger)
(_, processed_container) = processor.process_data(
configuration, data_container, context="rsmpredict"
)
# save the pre-processed features to disk if we were asked to
if feats_file is not None:
logger.info(f"Saving pre-processed feature values to {feats_file}")
feats_dir = dirname(feats_file)
# create any directories needed for the output file
os.makedirs(feats_dir, exist_ok=True)
_, feats_filename = split(feats_file)
feats_filename, _ = splitext(feats_filename)
# Write out files
writer.write_experiment_output(
feats_dir,
processed_container,
include_experiment_id=False,
dataframe_names=["features_processed"],
new_names_dict={"features_processed": feats_filename},
file_format=file_format,
)
if output_file.lower().endswith(".csv") or output_file.lower().endswith(".xlsx"):
output_dir = dirname(output_file)
_, filename = split(output_file)
filename, _ = splitext(filename)
else:
output_dir = output_file
filename = "predictions_with_metadata"
# create any directories needed for the output file
os.makedirs(output_dir, exist_ok=True)
# save the predictions to disk
logger.info("Saving predictions.")
# Write out files
writer.write_experiment_output(
output_dir,
processed_container,
include_experiment_id=False,
dataframe_names=["predictions_with_metadata"],
new_names_dict={"predictions_with_metadata": filename},
file_format=file_format,
)
# save excluded responses to disk
if not processed_container["excluded"].empty:
# save the predictions to disk
logger.info(
f"Saving excluded responses to "
f"{join(output_dir, f'{filename}_excluded_responses.csv')}"
)
# Write out files
writer.write_experiment_output(
output_dir,
processed_container,
include_experiment_id=False,
dataframe_names=["excluded"],
new_names_dict={"excluded": f"{filename}_excluded_responses"},
file_format=file_format,
)
def main(argv: Optional[List[str]] = None) -> None:
"""
Entry point for the ``rsmpredict`` command-line tool.
Parameters
----------
argv : Optional[List[str]]
List of arguments to use instead of ``sys.argv``.
Defaults to ``None``.
"""
# if no arguments are passed, then use sys.argv
if argv is None:
argv = sys.argv[1:]
# set up the basic logging configuration
formatter = LogFormatter()
# we need two handlers, one that prints to stdout
# for the "run" command and one that prints to stderr
# from the "generate" command; the latter is important
# because do not want the warning to show up in the
# generated configuration file
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setFormatter(formatter)
# to set up the argument parser, we first need to instantiate options
# specific to rsmpredict so we use the `CmdOption` namedtuples
non_standard_options = [
CmdOption(dest="output_file", help="output file where predictions will be saved."),
CmdOption(
dest="preproc_feats_file",
help="if specified, the preprocessed features " "will be saved in this file",
longname="features",
required=False,
),
]
# now call the helper function to instantiate the parser for us
parser = setup_rsmcmd_parser(
"rsmpredict",
uses_output_directory=False,
extra_run_options=non_standard_options,
)
# if we have no arguments at all then just show the help message
if len(argv) < 1:
argv.append("-h")
# if the first argument is not one of the valid sub-commands
# or one of the valid optional arguments, then assume that they
# are arguments for the "run" sub-command. This allows the
# old style command-line invocations to work without modification.
if argv[0] not in VALID_PARSER_SUBCOMMANDS + [
"-h",
"--help",
"-V",
"--version",
]:
args_to_pass = ["run"] + argv
else:
args_to_pass = argv
args = parser.parse_args(args=args_to_pass)
# call the appropriate function based on which sub-command was run
if args.subcommand == "run":
# when running, log to stdout
logging.root.addHandler(stdout_handler)
# run the experiment
preproc_feats_file = None
if args.preproc_feats_file:
preproc_feats_file = abspath(args.preproc_feats_file)
compute_and_save_predictions(
abspath(args.config_file),
abspath(args.output_file),
feats_file=preproc_feats_file,
)
else:
# when generating, log to stderr
logging.root.addHandler(stderr_handler)
# auto-generate an example configuration and print it to STDOUT
generator = ConfigurationGenerator(
"rsmpredict", as_string=True, suppress_warnings=args.quiet
)
configuration = (
generator.interact(output_file_name=args.output_file.name if args.output_file else None)
if args.interactive
else generator.generate()
)
print(configuration, file=args.output_file)
if __name__ == "__main__":
main()