Source code for rsmtool.rsmtool

#!/usr/bin/env python

"""
Run an rsmtool experiment.

:author: Jeremy Biggs (jbiggs@ets.org)
:author: Anastassia Loukina (aloukina@ets.org)
:author: Nitin Madnani (nmadnani@ets.org)

:organization: ETS
"""

import logging
import sys
from os import listdir, makedirs
from os.path import abspath, exists, join

from .analyzer import Analyzer
from .configuration_parser import configure
from .modeler import Modeler
from .preprocessor import FeaturePreprocessor
from .reader import DataReader
from .reporter import Reporter
from .utils.commandline import ConfigurationGenerator, setup_rsmcmd_parser
from .utils.constants import VALID_PARSER_SUBCOMMANDS
from .utils.logging import LogFormatter
from .utils.wandb import init_wandb_run, log_configuration_to_wandb
from .writer import DataWriter


[docs]def run_experiment( config_file_or_obj_or_dict, output_dir, overwrite_output=False, logger=None, wandb_run=None ): """ Run an rsmtool experiment using the given configuration. Run rsmtool experiment using the given configuration file, object, or dictionary. All outputs are generated under ``output_dir``. If ``overwrite_output`` is ``True``, any existing output in ``output_dir`` is overwritten. Parameters ---------- config_file_or_obj_or_dict : str or pathlib.Path or dict or Configuration Path to the experiment configuration file either a string or as a ``pathlib.Path`` object. Users can also pass a ``Configuration`` object that is in memory or a Python dictionary with keys corresponding to fields in the configuration file. Given a configuration file, any relative paths in the configuration file will be interpreted relative to the location of the file. Given a ``Configuration`` object, relative paths will be interpreted relative to the ``configdir`` attribute, that _must_ be set. Given a dictionary, the reference path is set to the current directory. output_dir : str Path to the experiment output directory. overwrite_output : bool, optional If ``True``, overwrite any existing output under ``output_dir``. Defaults to ``False``. logger : logging object, optional A logging object. If ``None`` is passed, get logger from ``__name__``. Defaults to ``None``. wandb_run : wandb.Run A wandb run object that will be used to log artifacts and tables. If ``None`` is passed, a new wandb run will be initialized if wandb is enabled in the configuration. Defaults to ``None``. Raises ------ FileNotFoundError If any of the files contained in ``config_file_or_obj_or_dict`` cannot be located. IOError If ``output_dir`` already contains the output of a previous experiment and ``overwrite_output`` is ``False``. ValueError If the current configuration specifies a non-linear model but ``output_dir`` already contains the output of a previous experiment that used a linear model with the same experiment ID. """ logger = logger if logger else logging.getLogger(__name__) # create the 'output' and the 'figure' sub-directories # where all the experiment output such as the CSV files # and the box plots will be saved # Get absolute paths to output directories csvdir = abspath(join(output_dir, "output")) figdir = abspath(join(output_dir, "figure")) reportdir = abspath(join(output_dir, "report")) featuredir = abspath(join(output_dir, "feature")) # Make directories, if necessary makedirs(csvdir, exist_ok=True) makedirs(figdir, exist_ok=True) makedirs(reportdir, exist_ok=True) # Raise an error if the specified output directory # already contains a non-empty `output` directory, unless # `overwrite_output` was specified, in which case we assume # that the user knows what she is doing and simply # output a warning saying that the report might # not be correct. non_empty_csvdir = exists(csvdir) and listdir(csvdir) if non_empty_csvdir: if not overwrite_output: raise IOError(f"'{output_dir}' already contains a non-empty 'output' directory.") else: logger.warning( f"{output_dir} already contains a non-empty 'output' directory. " f"The generated report might contain unexpected information from " f"a previous experiment." ) configuration = configure("rsmtool", config_file_or_obj_or_dict) logger.info("Saving configuration file.") configuration.save(output_dir) # If wandb logging is enabled, and wandb_run is not provided, # start a wandb run and log configuration if wandb_run is None: wandb_run = init_wandb_run(configuration) log_configuration_to_wandb(wandb_run, configuration) # Get output format file_format = configuration.get("file_format", "csv") # Get DataWriter object writer = DataWriter(configuration["experiment_id"], configuration.context, wandb_run) # Get the paths and names for the DataReader (file_names, file_paths_org) = configuration.get_names_and_paths( ["train_file", "test_file", "features", "feature_subset_file"], ["train", "test", "feature_specs", "feature_subset_specs"], ) file_paths = DataReader.locate_files(file_paths_org, configuration.configdir) # if there are any missing files after trying to locate # all expected files, raise an error if None in file_paths: missing_file_paths = [ file_paths_org[idx] for idx, path in enumerate(file_paths) if path is None ] raise FileNotFoundError(f"The following files were not found: {repr(missing_file_paths)}") # Use the default converter for both train and test converters = { "train": configuration.get_default_converter(), "test": configuration.get_default_converter(), } logger.info("Reading in all data from files.") # Initialize the reader reader = DataReader(file_paths, file_names, converters) data_container = reader.read() logger.info("Preprocessing all features.") # Initialize the processor processor = FeaturePreprocessor(logger=logger) (processed_config, processed_container) = processor.process_data(configuration, data_container) # Rename certain frames with more descriptive names # for writing out experiment files rename_dict = { "train_excluded": "train_excluded_responses", "test_excluded": "test_excluded_responses", "train_length": "train_response_lengths", "train_flagged": "train_responses_with_excluded_flags", "test_flagged": "test_responses_with_excluded_flags", } logger.info("Saving training and test set data to disk.") # Write out files writer.write_experiment_output( csvdir, processed_container, [ "train_features", "test_features", "train_metadata", "test_metadata", "train_other_columns", "test_other_columns", "train_preprocessed_features", "test_preprocessed_features", "train_excluded", "test_excluded", "train_length", "test_human_scores", "train_flagged", "test_flagged", ], rename_dict, file_format=file_format, ) # Initialize the analyzer analyzer = Analyzer(logger=logger) (_, analyzed_container) = analyzer.run_data_composition_analyses_for_rsmtool( processed_container, processed_config ) # Write out files writer.write_experiment_output(csvdir, analyzed_container, file_format=file_format) logger.info(f"Training {processed_config['model_name']} model.") # Initialize modeler modeler = Modeler(logger=logger) modeler.train(processed_config, processed_container, csvdir, figdir, file_format) # Identify the features used by the model selected_features = modeler.get_feature_names() # Add selected features to processed configuration processed_config["selected_features"] = selected_features # Write out files writer.write_feature_csv( featuredir, processed_container, selected_features, file_format=file_format ) features_data_container = processed_container.copy() # Get selected feature info, and write out to file df_feature_info = features_data_container.feature_info.copy() df_selected_feature_info = df_feature_info[df_feature_info["feature"].isin(selected_features)] selected_feature_dataset_dict = { "name": "selected_feature_info", "frame": df_selected_feature_info, } features_data_container.add_dataset(selected_feature_dataset_dict, update=True) writer.write_experiment_output( csvdir, features_data_container, dataframe_names=["selected_feature_info"], new_names_dict={"selected_feature_info": "feature"}, file_format=file_format, ) logger.info("Running analyses on training set.") (_, train_analyzed_container) = analyzer.run_training_analyses( processed_container, processed_config ) # Write out files writer.write_experiment_output( csvdir, train_analyzed_container, reset_index=True, file_format=file_format ) # Use only selected features for predictions columns_for_prediction = ["spkitemid", "sc1"] + selected_features train_for_prediction = processed_container.train_preprocessed_features[columns_for_prediction] test_for_prediction = processed_container.test_preprocessed_features[columns_for_prediction] logged_str = "Generating training and test set predictions" logged_str += " (expected scores)." if configuration["predict_expected_scores"] else "." logger.info(logged_str) (pred_config, pred_data_container) = modeler.predict_train_and_test( train_for_prediction, test_for_prediction, processed_config ) # Save modeler instance modeler.feature_info = processed_container.feature_info.copy() modeler.feature_info.set_index("feature", inplace=True) ( modeler.trim_min, modeler.trim_max, modeler.trim_tolerance, ) = configuration.get_trim_min_max_tolerance() pred_config_dict = pred_config.to_dict() for key, attr_name in [ ("train_predictions_mean", "train_predictions_mean"), ("train_predictions_sd", "train_predictions_sd"), ("human_labels_mean", "h1_mean"), ("human_labels_sd", "h1_sd"), ]: setattr(modeler, attr_name, pred_config_dict[key]) logger.info("Saving model.") modeler.save(join(csvdir, f"{configuration['experiment_id']}.model")) # Write out files writer.write_experiment_output( csvdir, pred_data_container, new_names_dict={"pred_test": "pred_processed"}, file_format=file_format, ) original_coef_file = join(csvdir, f"{pred_config['experiment_id']}_coefficients.{file_format}") # If coefficients file exists, then try to generate the scaled # coefficients and save them to a file if exists(original_coef_file): logger.info("Scaling the coefficients and saving them to disk") try: # scale coefficients, and return DataContainer w/ scaled coefficients scaled_data_container = modeler.scale_coefficients(pred_config) # raise an error if the coefficient file exists but the # coefficients are not available for the current model # which can happen if the user is re-running the same experiment # with the same ID but with a non-linear model whereas the previous # run of the same ID was with a linear model and the user has not # cleared the directory except RuntimeError: raise ValueError( "It appears you previously ran an experiment with the " "same ID using a linear model and saved its output to " "the same directory. That output is interfering with " "the current experiment. Either clear the contents " "of the output directory or re-run the current " "experiment using a different experiment ID." ) else: # Write out scaled coefficients to disk writer.write_experiment_output(csvdir, scaled_data_container, file_format=file_format) # Add processed data_container frames to pred_data_container new_pred_data_container = pred_data_container + processed_container logger.info("Running prediction analyses.") ( pred_analysis_config, pred_analysis_data_container, ) = analyzer.run_prediction_analyses(new_pred_data_container, pred_config, wandb_run) # Write out files writer.write_experiment_output( csvdir, pred_analysis_data_container, reset_index=True, file_format=file_format ) # Initialize reporter reporter = Reporter(logger=logger, wandb_run=wandb_run) # generate the report logger.info("Starting report generation.") reporter.create_report(pred_analysis_config, csvdir, figdir)
def main(argv=None): # noqa: D103 # if no arguments are passed, then use sys.argv if argv is None: argv = sys.argv[1:] # set up the basic logging configuration formatter = LogFormatter() # we need two handlers, one that prints to stdout # for the "run" command and one that prints to stderr # from the "generate" command; the latter is important # because do not want the warning to show up in the # generated configuration file stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(formatter) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setFormatter(formatter) logging.root.setLevel(logging.INFO) logger = logging.getLogger(__name__) # set up an argument parser via our helper function parser = setup_rsmcmd_parser( "rsmtool", uses_output_directory=True, allows_overwriting=True, uses_subgroups=True, ) # if we have no arguments at all then just show the help message if len(argv) < 1: argv.append("-h") # if the first argument is not one of the valid sub-commands # or one of the valid optional arguments, then assume that they # are arguments for the "run" sub-command. This allows the # old style command-line invocations to work without modification. if argv[0] not in VALID_PARSER_SUBCOMMANDS + [ "-h", "--help", "-V", "--version", ]: args_to_pass = ["run"] + argv else: args_to_pass = argv args = parser.parse_args(args=args_to_pass) # call the appropriate function based on which sub-command was run if args.subcommand == "run": # when running, log to stdout logging.root.addHandler(stdout_handler) # run the experiment logger.info(f"Output directory: {args.output_dir}") run_experiment( abspath(args.config_file), abspath(args.output_dir), overwrite_output=args.force_write, ) else: # when generating, log to stderr logging.root.addHandler(stderr_handler) # auto-generate an example configuration and print it to STDOUT generator = ConfigurationGenerator( "rsmtool", as_string=True, suppress_warnings=args.quiet, use_subgroups=args.subgroups, ) configuration = ( generator.interact(output_file_name=args.output_file.name if args.output_file else None) if args.interactive else generator.generate() ) print(configuration, file=args.output_file) if __name__ == "__main__": main()