#!/usr/bin/env python
"""
Run an rsmtool experiment.
:author: Jeremy Biggs (jbiggs@ets.org)
:author: Anastassia Loukina (aloukina@ets.org)
:author: Nitin Madnani (nmadnani@ets.org)
:organization: ETS
"""
import logging
import sys
from os import listdir, makedirs
from os.path import abspath, exists, join
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from wandb.sdk.lib import RunDisabled
from wandb.wandb_run import Run
from rsmtool.container import DatasetDict
from .analyzer import Analyzer
from .configuration_parser import Configuration, configure
from .modeler import Modeler
from .preprocessor import FeaturePreprocessor
from .reader import DataReader
from .reporter import Reporter
from .utils.commandline import ConfigurationGenerator, setup_rsmcmd_parser
from .utils.constants import VALID_PARSER_SUBCOMMANDS
from .utils.logging import LogFormatter
from .utils.wandb import init_wandb_run, log_configuration_to_wandb
from .writer import DataWriter
[docs]
def run_experiment(
config_file_or_obj_or_dict: Union[str, Configuration, Dict[str, Any], Path],
output_dir: str,
overwrite_output: bool = False,
logger: Optional[logging.Logger] = None,
wandb_run: Union[Run, RunDisabled, None] = None,
) -> None:
"""
Run an rsmtool experiment using the given configuration.
Run rsmtool experiment using the given configuration file, object, or
dictionary. All outputs are generated under ``output_dir``. If
``overwrite_output`` is ``True``, any existing output in ``output_dir``
is overwritten.
Parameters
----------
config_file_or_obj_or_dict : Union[str, Configuration, Dict[str, Any], Path]
Path to the experiment configuration file as either a string
or as a ``pathlib.Path`` object. Users can also pass a
``Configuration`` object that is in memory or a Python dictionary
with keys corresponding to fields in the configuration file. Given a
configuration file, any relative paths in the configuration file
will be interpreted relative to the location of the file. Given a
``Configuration`` object, relative paths will be interpreted
relative to the ``configdir`` attribute, that _must_ be set. Given
a dictionary, the reference path is set to the current directory.
output_dir : str
Path to the experiment output directory.
overwrite_output : bool
If ``True``, overwrite any existing output under ``output_dir``.
Defaults to ``False``.
logger : Optional[logging.Logger]
A Logger object. If ``None`` is passed, get logger from ``__name__``.
Defaults to ``None``.
wandb_run : Union[wandb.wandb_run.Run, wandb.sdk.lib.RunDisabled, None]
A wandb run object that will be used to log artifacts and tables.
If ``None`` is passed, a new wandb run will be initialized if
wandb is enabled in the configuration.
Defaults to ``None``.
Raises
------
FileNotFoundError
If any of the files contained in ``config_file_or_obj_or_dict`` cannot
be located.
IOError
If ``output_dir`` already contains the output of a previous experiment
and ``overwrite_output`` is ``False``.
ValueError
If the current configuration specifies a non-linear model but
``output_dir`` already contains the output of a previous
experiment that used a linear model with the same experiment ID.
"""
logger = logger if logger else logging.getLogger(__name__)
# create the 'output' and the 'figure' sub-directories
# where all the experiment output such as the CSV files
# and the box plots will be saved
# Get absolute paths to output directories
csvdir = abspath(join(output_dir, "output"))
figdir = abspath(join(output_dir, "figure"))
reportdir = abspath(join(output_dir, "report"))
featuredir = abspath(join(output_dir, "feature"))
# Make directories, if necessary
makedirs(csvdir, exist_ok=True)
makedirs(figdir, exist_ok=True)
makedirs(reportdir, exist_ok=True)
# Raise an error if the specified output directory
# already contains a non-empty `output` directory, unless
# `overwrite_output` was specified, in which case we assume
# that the user knows what she is doing and simply
# output a warning saying that the report might
# not be correct.
non_empty_csvdir = exists(csvdir) and listdir(csvdir)
if non_empty_csvdir:
if not overwrite_output:
raise IOError(f"'{output_dir}' already contains a non-empty 'output' directory.")
else:
logger.warning(
f"{output_dir} already contains a non-empty 'output' directory. "
f"The generated report might contain unexpected information from "
f"a previous experiment."
)
configuration = configure("rsmtool", config_file_or_obj_or_dict)
logger.info("Saving configuration file.")
configuration.save(output_dir)
# If wandb logging is enabled, and wandb_run is not provided,
# start a wandb run and log configuration
if wandb_run is None:
wandb_run = init_wandb_run(configuration)
log_configuration_to_wandb(wandb_run, configuration)
# Get output format
file_format = configuration.get("file_format", "csv")
# Get DataWriter object
writer = DataWriter(configuration["experiment_id"], configuration.context, wandb_run)
# Get the paths and names for the DataReader
(file_names, file_paths_org) = configuration.get_names_and_paths(
["train_file", "test_file", "features", "feature_subset_file"],
["train", "test", "feature_specs", "feature_subset_specs"],
)
file_paths = DataReader.locate_files(file_paths_org, configuration.configdir)
# if there are any missing files after trying to locate
# all expected files, raise an error
if None in file_paths:
missing_file_paths = [
file_paths_org[idx] for idx, path in enumerate(file_paths) if path is None
]
raise FileNotFoundError(f"The following files were not found: {repr(missing_file_paths)}")
# Use the default converter for both train and test
converters = {
"train": configuration.get_default_converter(),
"test": configuration.get_default_converter(),
}
logger.info("Reading in all data from files.")
# Initialize the reader
reader = DataReader(file_paths, file_names, converters)
data_container = reader.read()
logger.info("Preprocessing all features.")
# Initialize the processor
processor = FeaturePreprocessor(logger=logger)
(processed_config, processed_container) = processor.process_data(configuration, data_container)
# Rename certain frames with more descriptive names
# for writing out experiment files
rename_dict = {
"train_excluded": "train_excluded_responses",
"test_excluded": "test_excluded_responses",
"train_length": "train_response_lengths",
"train_flagged": "train_responses_with_excluded_flags",
"test_flagged": "test_responses_with_excluded_flags",
}
logger.info("Saving training and test set data to disk.")
# Write out files
writer.write_experiment_output(
csvdir,
processed_container,
[
"train_features",
"test_features",
"train_metadata",
"test_metadata",
"train_other_columns",
"test_other_columns",
"train_preprocessed_features",
"test_preprocessed_features",
"train_excluded",
"test_excluded",
"train_length",
"test_human_scores",
"train_flagged",
"test_flagged",
],
rename_dict,
file_format=file_format,
)
# Initialize the analyzer
analyzer = Analyzer(logger=logger)
(_, analyzed_container) = analyzer.run_data_composition_analyses_for_rsmtool(
processed_container, processed_config
)
# Write out files
writer.write_experiment_output(csvdir, analyzed_container, file_format=file_format)
logger.info(f"Training {processed_config['model_name']} model.")
# Initialize modeler
modeler = Modeler(logger=logger)
modeler.train(processed_config, processed_container, csvdir, file_format)
# Identify the features used by the model
selected_features = modeler.get_feature_names()
assert selected_features is not None
# Add selected features to processed configuration
processed_config["selected_features"] = selected_features
# Write out files
writer.write_feature_csv(
featuredir, processed_container, selected_features, file_format=file_format
)
features_data_container = processed_container.copy()
# Get selected feature info, and write out to file
df_feature_info = features_data_container["feature_info"].copy()
df_selected_feature_info = df_feature_info[df_feature_info["feature"].isin(selected_features)]
selected_feature_dataset_dict = DatasetDict(
{
"name": "selected_feature_info",
"frame": df_selected_feature_info,
}
)
features_data_container.add_dataset(selected_feature_dataset_dict, update=True)
writer.write_experiment_output(
csvdir,
features_data_container,
dataframe_names=["selected_feature_info"],
new_names_dict={"selected_feature_info": "feature"},
file_format=file_format,
)
logger.info("Running analyses on training set.")
(_, train_analyzed_container) = analyzer.run_training_analyses(
processed_container, processed_config
)
# Write out files
writer.write_experiment_output(
csvdir, train_analyzed_container, reset_index=True, file_format=file_format
)
# Use only selected features for predictions
columns_for_prediction = (
["spkitemid", "sc1"] + selected_features if selected_features else ["spkitemid", "sc1"]
)
train_for_prediction = processed_container["train_preprocessed_features"][
columns_for_prediction
]
test_for_prediction = processed_container["test_preprocessed_features"][columns_for_prediction]
logged_str = "Generating training and test set predictions"
logged_str += " (expected scores)." if configuration["predict_expected_scores"] else "."
logger.info(logged_str)
(pred_config, pred_data_container) = modeler.predict_train_and_test(
train_for_prediction, test_for_prediction, processed_config
)
# Save modeler instance
modeler.feature_info = processed_container["feature_info"].copy()
modeler.feature_info.set_index("feature", inplace=True)
spec_trim_min, spec_trim_max, spec_trim_tolerance = configuration.get_trim_min_max_tolerance()
if spec_trim_min:
modeler.trim_min = spec_trim_min
if spec_trim_max:
modeler.trim_max = spec_trim_max
if spec_trim_tolerance:
modeler.trim_tolerance = spec_trim_tolerance
pred_config_dict = pred_config.to_dict()
for key, attr_name in [
("train_predictions_mean", "train_predictions_mean"),
("train_predictions_sd", "train_predictions_sd"),
("human_labels_mean", "h1_mean"),
("human_labels_sd", "h1_sd"),
]:
setattr(modeler, attr_name, pred_config_dict[key])
logger.info("Saving model.")
modeler.save(join(csvdir, f"{configuration['experiment_id']}.model"))
# Write out files
writer.write_experiment_output(
csvdir,
pred_data_container,
new_names_dict={"pred_test": "pred_processed"},
file_format=file_format,
)
original_coef_file = join(csvdir, f"{pred_config['experiment_id']}_coefficients.{file_format}")
# If coefficients file exists, then try to generate the scaled
# coefficients and save them to a file
if exists(original_coef_file):
logger.info("Scaling the coefficients and saving them to disk")
try:
# scale coefficients, and return DataContainer w/ scaled coefficients
scaled_data_container = modeler.scale_coefficients(pred_config)
# raise an error if the coefficient file exists but the
# coefficients are not available for the current model
# which can happen if the user is re-running the same experiment
# with the same ID but with a non-linear model whereas the previous
# run of the same ID was with a linear model and the user has not
# cleared the directory
except RuntimeError:
raise ValueError(
"It appears you previously ran an experiment with the "
"same ID using a linear model and saved its output to "
"the same directory. That output is interfering with "
"the current experiment. Either clear the contents "
"of the output directory or re-run the current "
"experiment using a different experiment ID."
)
else:
# Write out scaled coefficients to disk
writer.write_experiment_output(csvdir, scaled_data_container, file_format=file_format)
# Add processed data_container frames to pred_data_container
new_pred_data_container = pred_data_container + processed_container
logger.info("Running prediction analyses.")
(
pred_analysis_config,
pred_analysis_data_container,
) = analyzer.run_prediction_analyses(new_pred_data_container, pred_config, wandb_run)
# Write out files
writer.write_experiment_output(
csvdir, pred_analysis_data_container, reset_index=True, file_format=file_format
)
# Initialize reporter
reporter = Reporter(logger=logger, wandb_run=wandb_run)
# generate the report
logger.info("Starting report generation.")
reporter.create_report(pred_analysis_config, csvdir, figdir)
def main(argv: Optional[List[str]] = None) -> None:
"""
Entry point for the ``rsmtool`` command-line interface.
Parameters
----------
argv : Optional[List[str]]
List of arguments to use instead of ``sys.argv``.
Defaults to ``None``.
"""
# if no arguments are passed, then use sys.argv
if argv is None:
argv = sys.argv[1:]
# set up the basic logging configuration
formatter = LogFormatter()
# we need two handlers, one that prints to stdout
# for the "run" command and one that prints to stderr
# from the "generate" command; the latter is important
# because do not want the warning to show up in the
# generated configuration file
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setFormatter(formatter)
logging.root.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
# set up an argument parser via our helper function
parser = setup_rsmcmd_parser(
"rsmtool",
uses_output_directory=True,
allows_overwriting=True,
uses_subgroups=True,
)
# if we have no arguments at all then just show the help message
if len(argv) < 1:
argv.append("-h")
# if the first argument is not one of the valid sub-commands
# or one of the valid optional arguments, then assume that they
# are arguments for the "run" sub-command. This allows the
# old style command-line invocations to work without modification.
if argv[0] not in VALID_PARSER_SUBCOMMANDS + [
"-h",
"--help",
"-V",
"--version",
]:
args_to_pass = ["run"] + argv
else:
args_to_pass = argv
args = parser.parse_args(args=args_to_pass)
# call the appropriate function based on which sub-command was run
if args.subcommand == "run":
# when running, log to stdout
logging.root.addHandler(stdout_handler)
# run the experiment
logger.info(f"Output directory: {args.output_dir}")
run_experiment(
abspath(args.config_file),
abspath(args.output_dir),
overwrite_output=args.force_write,
)
else:
# when generating, log to stderr
logging.root.addHandler(stderr_handler)
# auto-generate an example configuration and print it to STDOUT
generator = ConfigurationGenerator(
"rsmtool",
as_string=True,
suppress_warnings=args.quiet,
use_subgroups=args.subgroups,
)
configuration = (
generator.interact(output_file_name=args.output_file.name if args.output_file else None)
if args.interactive
else generator.generate()
)
print(configuration, file=args.output_file)
if __name__ == "__main__":
main()