Source code for rsmtool.rsmtool

#!/usr/bin/env python

"""
Run an rsmtool experiment.

:author: Jeremy Biggs (jbiggs@ets.org)
:author: Anastassia Loukina (aloukina@ets.org)
:author: Nitin Madnani (nmadnani@ets.org)

:organization: ETS
"""

import logging
import sys
from os import listdir, makedirs
from os.path import abspath, exists, join
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from wandb.sdk.lib import RunDisabled
from wandb.wandb_run import Run

from rsmtool.container import DatasetDict

from .analyzer import Analyzer
from .configuration_parser import Configuration, configure
from .modeler import Modeler
from .preprocessor import FeaturePreprocessor
from .reader import DataReader
from .reporter import Reporter
from .utils.commandline import ConfigurationGenerator, setup_rsmcmd_parser
from .utils.constants import VALID_PARSER_SUBCOMMANDS
from .utils.logging import LogFormatter
from .utils.wandb import init_wandb_run, log_configuration_to_wandb
from .writer import DataWriter


[docs] def run_experiment( config_file_or_obj_or_dict: Union[str, Configuration, Dict[str, Any], Path], output_dir: str, overwrite_output: bool = False, logger: Optional[logging.Logger] = None, wandb_run: Union[Run, RunDisabled, None] = None, ) -> None: """ Run an rsmtool experiment using the given configuration. Run rsmtool experiment using the given configuration file, object, or dictionary. All outputs are generated under ``output_dir``. If ``overwrite_output`` is ``True``, any existing output in ``output_dir`` is overwritten. Parameters ---------- config_file_or_obj_or_dict : Union[str, Configuration, Dict[str, Any], Path] Path to the experiment configuration file as either a string or as a ``pathlib.Path`` object. Users can also pass a ``Configuration`` object that is in memory or a Python dictionary with keys corresponding to fields in the configuration file. Given a configuration file, any relative paths in the configuration file will be interpreted relative to the location of the file. Given a ``Configuration`` object, relative paths will be interpreted relative to the ``configdir`` attribute, that _must_ be set. Given a dictionary, the reference path is set to the current directory. output_dir : str Path to the experiment output directory. overwrite_output : bool If ``True``, overwrite any existing output under ``output_dir``. Defaults to ``False``. logger : Optional[logging.Logger] A Logger object. If ``None`` is passed, get logger from ``__name__``. Defaults to ``None``. wandb_run : Union[wandb.wandb_run.Run, wandb.sdk.lib.RunDisabled, None] A wandb run object that will be used to log artifacts and tables. If ``None`` is passed, a new wandb run will be initialized if wandb is enabled in the configuration. Defaults to ``None``. Raises ------ FileNotFoundError If any of the files contained in ``config_file_or_obj_or_dict`` cannot be located. IOError If ``output_dir`` already contains the output of a previous experiment and ``overwrite_output`` is ``False``. ValueError If the current configuration specifies a non-linear model but ``output_dir`` already contains the output of a previous experiment that used a linear model with the same experiment ID. """ logger = logger if logger else logging.getLogger(__name__) # create the 'output' and the 'figure' sub-directories # where all the experiment output such as the CSV files # and the box plots will be saved # Get absolute paths to output directories csvdir = abspath(join(output_dir, "output")) figdir = abspath(join(output_dir, "figure")) reportdir = abspath(join(output_dir, "report")) featuredir = abspath(join(output_dir, "feature")) # Make directories, if necessary makedirs(csvdir, exist_ok=True) makedirs(figdir, exist_ok=True) makedirs(reportdir, exist_ok=True) # Raise an error if the specified output directory # already contains a non-empty `output` directory, unless # `overwrite_output` was specified, in which case we assume # that the user knows what she is doing and simply # output a warning saying that the report might # not be correct. non_empty_csvdir = exists(csvdir) and listdir(csvdir) if non_empty_csvdir: if not overwrite_output: raise IOError(f"'{output_dir}' already contains a non-empty 'output' directory.") else: logger.warning( f"{output_dir} already contains a non-empty 'output' directory. " f"The generated report might contain unexpected information from " f"a previous experiment." ) configuration = configure("rsmtool", config_file_or_obj_or_dict) logger.info("Saving configuration file.") configuration.save(output_dir) # If wandb logging is enabled, and wandb_run is not provided, # start a wandb run and log configuration if wandb_run is None: wandb_run = init_wandb_run(configuration) log_configuration_to_wandb(wandb_run, configuration) # Get output format file_format = configuration.get("file_format", "csv") # Get DataWriter object writer = DataWriter(configuration["experiment_id"], configuration.context, wandb_run) # Get the paths and names for the DataReader (file_names, file_paths_org) = configuration.get_names_and_paths( ["train_file", "test_file", "features", "feature_subset_file"], ["train", "test", "feature_specs", "feature_subset_specs"], ) file_paths = DataReader.locate_files(file_paths_org, configuration.configdir) # if there are any missing files after trying to locate # all expected files, raise an error if None in file_paths: missing_file_paths = [ file_paths_org[idx] for idx, path in enumerate(file_paths) if path is None ] raise FileNotFoundError(f"The following files were not found: {repr(missing_file_paths)}") # Use the default converter for both train and test converters = { "train": configuration.get_default_converter(), "test": configuration.get_default_converter(), } logger.info("Reading in all data from files.") # Initialize the reader reader = DataReader(file_paths, file_names, converters) data_container = reader.read() logger.info("Preprocessing all features.") # Initialize the processor processor = FeaturePreprocessor(logger=logger) (processed_config, processed_container) = processor.process_data(configuration, data_container) # Rename certain frames with more descriptive names # for writing out experiment files rename_dict = { "train_excluded": "train_excluded_responses", "test_excluded": "test_excluded_responses", "train_length": "train_response_lengths", "train_flagged": "train_responses_with_excluded_flags", "test_flagged": "test_responses_with_excluded_flags", } logger.info("Saving training and test set data to disk.") # Write out files writer.write_experiment_output( csvdir, processed_container, [ "train_features", "test_features", "train_metadata", "test_metadata", "train_other_columns", "test_other_columns", "train_preprocessed_features", "test_preprocessed_features", "train_excluded", "test_excluded", "train_length", "test_human_scores", "train_flagged", "test_flagged", ], rename_dict, file_format=file_format, ) # Initialize the analyzer analyzer = Analyzer(logger=logger) (_, analyzed_container) = analyzer.run_data_composition_analyses_for_rsmtool( processed_container, processed_config ) # Write out files writer.write_experiment_output(csvdir, analyzed_container, file_format=file_format) logger.info(f"Training {processed_config['model_name']} model.") # Initialize modeler modeler = Modeler(logger=logger) modeler.train(processed_config, processed_container, csvdir, file_format) # Identify the features used by the model selected_features = modeler.get_feature_names() assert selected_features is not None # Add selected features to processed configuration processed_config["selected_features"] = selected_features # Write out files writer.write_feature_csv( featuredir, processed_container, selected_features, file_format=file_format ) features_data_container = processed_container.copy() # Get selected feature info, and write out to file df_feature_info = features_data_container["feature_info"].copy() df_selected_feature_info = df_feature_info[df_feature_info["feature"].isin(selected_features)] selected_feature_dataset_dict = DatasetDict( { "name": "selected_feature_info", "frame": df_selected_feature_info, } ) features_data_container.add_dataset(selected_feature_dataset_dict, update=True) writer.write_experiment_output( csvdir, features_data_container, dataframe_names=["selected_feature_info"], new_names_dict={"selected_feature_info": "feature"}, file_format=file_format, ) logger.info("Running analyses on training set.") (_, train_analyzed_container) = analyzer.run_training_analyses( processed_container, processed_config ) # Write out files writer.write_experiment_output( csvdir, train_analyzed_container, reset_index=True, file_format=file_format ) # Use only selected features for predictions columns_for_prediction = ( ["spkitemid", "sc1"] + selected_features if selected_features else ["spkitemid", "sc1"] ) train_for_prediction = processed_container["train_preprocessed_features"][ columns_for_prediction ] test_for_prediction = processed_container["test_preprocessed_features"][columns_for_prediction] logged_str = "Generating training and test set predictions" logged_str += " (expected scores)." if configuration["predict_expected_scores"] else "." logger.info(logged_str) (pred_config, pred_data_container) = modeler.predict_train_and_test( train_for_prediction, test_for_prediction, processed_config ) # Save modeler instance modeler.feature_info = processed_container["feature_info"].copy() modeler.feature_info.set_index("feature", inplace=True) spec_trim_min, spec_trim_max, spec_trim_tolerance = configuration.get_trim_min_max_tolerance() if spec_trim_min: modeler.trim_min = spec_trim_min if spec_trim_max: modeler.trim_max = spec_trim_max if spec_trim_tolerance: modeler.trim_tolerance = spec_trim_tolerance pred_config_dict = pred_config.to_dict() for key, attr_name in [ ("train_predictions_mean", "train_predictions_mean"), ("train_predictions_sd", "train_predictions_sd"), ("human_labels_mean", "h1_mean"), ("human_labels_sd", "h1_sd"), ]: setattr(modeler, attr_name, pred_config_dict[key]) logger.info("Saving model.") modeler.save(join(csvdir, f"{configuration['experiment_id']}.model")) # Write out files writer.write_experiment_output( csvdir, pred_data_container, new_names_dict={"pred_test": "pred_processed"}, file_format=file_format, ) original_coef_file = join(csvdir, f"{pred_config['experiment_id']}_coefficients.{file_format}") # If coefficients file exists, then try to generate the scaled # coefficients and save them to a file if exists(original_coef_file): logger.info("Scaling the coefficients and saving them to disk") try: # scale coefficients, and return DataContainer w/ scaled coefficients scaled_data_container = modeler.scale_coefficients(pred_config) # raise an error if the coefficient file exists but the # coefficients are not available for the current model # which can happen if the user is re-running the same experiment # with the same ID but with a non-linear model whereas the previous # run of the same ID was with a linear model and the user has not # cleared the directory except RuntimeError: raise ValueError( "It appears you previously ran an experiment with the " "same ID using a linear model and saved its output to " "the same directory. That output is interfering with " "the current experiment. Either clear the contents " "of the output directory or re-run the current " "experiment using a different experiment ID." ) else: # Write out scaled coefficients to disk writer.write_experiment_output(csvdir, scaled_data_container, file_format=file_format) # Add processed data_container frames to pred_data_container new_pred_data_container = pred_data_container + processed_container logger.info("Running prediction analyses.") ( pred_analysis_config, pred_analysis_data_container, ) = analyzer.run_prediction_analyses(new_pred_data_container, pred_config, wandb_run) # Write out files writer.write_experiment_output( csvdir, pred_analysis_data_container, reset_index=True, file_format=file_format ) # Initialize reporter reporter = Reporter(logger=logger, wandb_run=wandb_run) # generate the report logger.info("Starting report generation.") reporter.create_report(pred_analysis_config, csvdir, figdir)
def main(argv: Optional[List[str]] = None) -> None: """ Entry point for the ``rsmtool`` command-line interface. Parameters ---------- argv : Optional[List[str]] List of arguments to use instead of ``sys.argv``. Defaults to ``None``. """ # if no arguments are passed, then use sys.argv if argv is None: argv = sys.argv[1:] # set up the basic logging configuration formatter = LogFormatter() # we need two handlers, one that prints to stdout # for the "run" command and one that prints to stderr # from the "generate" command; the latter is important # because do not want the warning to show up in the # generated configuration file stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(formatter) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setFormatter(formatter) logging.root.setLevel(logging.INFO) logger = logging.getLogger(__name__) # set up an argument parser via our helper function parser = setup_rsmcmd_parser( "rsmtool", uses_output_directory=True, allows_overwriting=True, uses_subgroups=True, ) # if we have no arguments at all then just show the help message if len(argv) < 1: argv.append("-h") # if the first argument is not one of the valid sub-commands # or one of the valid optional arguments, then assume that they # are arguments for the "run" sub-command. This allows the # old style command-line invocations to work without modification. if argv[0] not in VALID_PARSER_SUBCOMMANDS + [ "-h", "--help", "-V", "--version", ]: args_to_pass = ["run"] + argv else: args_to_pass = argv args = parser.parse_args(args=args_to_pass) # call the appropriate function based on which sub-command was run if args.subcommand == "run": # when running, log to stdout logging.root.addHandler(stdout_handler) # run the experiment logger.info(f"Output directory: {args.output_dir}") run_experiment( abspath(args.config_file), abspath(args.output_dir), overwrite_output=args.force_write, ) else: # when generating, log to stderr logging.root.addHandler(stderr_handler) # auto-generate an example configuration and print it to STDOUT generator = ConfigurationGenerator( "rsmtool", as_string=True, suppress_warnings=args.quiet, use_subgroups=args.subgroups, ) configuration = ( generator.interact(output_file_name=args.output_file.name if args.output_file else None) if args.interactive else generator.generate() ) print(configuration, file=args.output_file) if __name__ == "__main__": main()