Source code for tempor.benchmarks.evaluation

"""Module with helpers for evaluating the performance of the methods."""

import copy
import warnings
from time import time
from typing import TYPE_CHECKING, Any, Dict, List, Tuple, cast

import numpy as np
import pandas as pd
import pydantic
import scipy.stats
import sklearn.metrics
import sklearn.model_selection
from typing_extensions import Literal, get_args

from tempor.core import plugins, pydantic_utils
from tempor.data import data_typing, dataset, samples
from tempor.log import logger
from tempor.metrics import metric as metric_module
from tempor.models.utils import enable_reproducibility

from . import utils

if TYPE_CHECKING:  # pragma: no cover
    from tempor.methods.prediction.one_off.classification import BaseOneOffClassifier
    from tempor.methods.prediction.one_off.regression import BaseOneOffRegressor
    from tempor.methods.time_to_event import BaseTimeToEventAnalysis

# TODO: Benchmarking workflow for missing cases.

_plugin_loader = plugins.PluginLoader()
builtin_metrics_prediction_oneoff_classification = _plugin_loader.list(plugin_type="metric")["prediction"]["one_off"][
    "classification"
]
builtin_metrics_prediction_oneoff_regression = _plugin_loader.list(plugin_type="metric")["prediction"]["one_off"][
    "regression"
]
builtin_metrics_time_to_event = _plugin_loader.list(plugin_type="metric")["time_to_event"]


OutputMetric = Literal[
    "min",
    "max",
    "mean",
    "stddev",
    "median",
    "iqr",
    "rounds",
    "errors",
    "durations",
]
"""The metric evaluation output statistics / other information about the evaluation cross-validation runs.

Possible values:
    - ``"min"``:
        The mix score of the metric
    - ``"max"``:
        The max score of the metric
    - ``"mean"``:
        The mean score of the metric
    - ``"stddev"``:
        The stddev score of the metric
    - ``"median"``:
        The median score of the metric
    - ``"iqr"``:
        The interquartile range of the metric
    - ``"rounds"``:
        Number of folds
    - ``"errors"``:
        Number of errors encountered
    - ``"durations"``:
        Average duration for the fold evaluation.
"""

output_metrics = get_args(OutputMetric)
"""A tuple of all possible values of :obj:`~tempor.benchmarks.evaluation.OutputMetric`."""


class _InternalScores(pydantic.BaseModel):
    metrics: Dict[str, np.ndarray] = {}  # np.ndarray expected to be 1D, contain floats.
    errors: List[int] = []
    durations: List[float] = []
    model_config = pydantic.ConfigDict(arbitrary_types_allowed=True)


@pydantic_utils.validate_arguments(config=pydantic.ConfigDict(arbitrary_types_allowed=True))
def _postprocess_results(results: _InternalScores) -> pd.DataFrame:
    output = pd.DataFrame([], columns=output_metrics)

    for metric in results.metrics:
        values = results.metrics[metric]
        errors = np.sum(results.errors)
        durations = utils.print_score(utils.generate_score(np.asarray(results.durations)))

        score_min = np.min(values)
        score_max = np.max(values)
        score_mean = np.mean(values)
        score_median = np.median(values)
        score_stddev = np.std(values)
        score_iqr = scipy.stats.iqr(values)
        score_rounds = len(values)

        output = pd.concat(
            [
                output,
                pd.DataFrame(
                    [
                        [
                            score_min,
                            score_max,
                            score_mean,
                            score_stddev,
                            score_median,
                            score_iqr,
                            score_rounds,
                            errors,
                            durations,
                        ]
                    ],
                    columns=output_metrics,
                    index=[metric],
                ),
            ],
        )

    return output


[docs]@pydantic_utils.validate_arguments(config=pydantic.ConfigDict(arbitrary_types_allowed=True)) def evaluate_prediction_oneoff_classifier( # pylint: disable=unused-argument estimator: Any, data: dataset.PredictiveDataset, n_splits: int = 3, random_state: int = 0, raise_exceptions: bool = False, silence_warnings: bool = False, **kwargs: Any, ) -> pd.DataFrame: """Helper for evaluating classifiers. Args: estimator (Any): Baseline model to evaluate - must be unfitted. data (dataset.PredictiveDataset): The dataset. n_splits (int, optional): Cross-validation folds. Defaults to ``3``. random_state (int, optional): Random state. Defaults to ``0``. raise_exceptions (bool, optional): Whether to raise exceptions during evaluation. If `False`, the exceptions will be swallowed and the evaluation will continue - exception count will be reported in the `"errors"` column of the resultant dataframe. Defaults to `False`. silence_warnings (bool, optional): Whether to silence warnings raised. Defaults to `False`. **kwargs (Any): Currently unused. Returns: pd.DataFrame: DataFrame containing the results. The columns of the dataframe contain details about the cross-validation repeats: one column for each :obj:`~tempor.benchmarks.evaluation.OutputMetric`. The index of the dataframe contains all the metrics registered: >>> from tempor import plugin_loader >>> plugin_loader.list(plugin_type="metric")["prediction"]["one_off"]["classification"] [...] """ # For the sake of import modularity, do not use the global plugin loader here, but create own: _plugin_loader = plugins.PluginLoader() metric_plugin_category = "prediction.one_off.classification" supported_metrics = _plugin_loader.list(plugin_type="metric")["prediction"]["one_off"]["classification"] with warnings.catch_warnings(): if silence_warnings: warnings.simplefilter("ignore") if n_splits < 2 or not isinstance(n_splits, int): raise ValueError("n_splits must be an integer >= 2") estimator_ = cast("BaseOneOffClassifier", estimator) enable_reproducibility(random_state) results = _InternalScores() for metric_name in supported_metrics: results.metrics[metric_name] = np.zeros(n_splits) splitter = sklearn.model_selection.StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) if data.predictive.targets is None: raise ValueError("The dataset for evaluation needs to contain targets but did not") labels = data.predictive.targets.numpy().squeeze() if len(labels.shape) > 1: raise ValueError("Classifier evaluation expects 1D output") indx = 0 for train_data, test_data in data.split(splitter=splitter, y=labels): model = copy.deepcopy(estimator_) start = time() try: model.fit(train_data) if TYPE_CHECKING: # pragma: no cover assert test_data.predictive.targets is not None # nosec B101 test_labels = test_data.predictive.targets.numpy() preds = model.predict_proba(test_data).numpy() for metric_name in supported_metrics: metric = cast( metric_module.OneOffClassificationMetric, _plugin_loader.get(f"{metric_plugin_category}.{metric_name}", plugin_type="metric"), ) results.metrics[metric_name][indx] = metric.evaluate(test_labels, preds) results.errors.append(0) except BaseException as e: # pylint: disable=broad-except logger.error(f"Evaluation failed: {e}") results.errors.append(1) if raise_exceptions: raise results.durations.append(time() - start) indx += 1 return _postprocess_results(results)
[docs]@pydantic_utils.validate_arguments(config=pydantic.ConfigDict(arbitrary_types_allowed=True)) def evaluate_prediction_oneoff_regressor( # pylint: disable=unused-argument estimator: Any, data: dataset.PredictiveDataset, n_splits: int = 3, random_state: int = 0, raise_exceptions: bool = False, silence_warnings: bool = False, **kwargs: Any, ) -> pd.DataFrame: """Helper for evaluating regression tasks. Args: estimator (Any): Baseline model to evaluate - must be unfitted. data (dataset.PredictiveDataset): The dataset. n_splits (int, optional): Cross-validation folds. Defaults to ``3``. random_state (int, optional): Random state. Defaults to ``0``. raise_exceptions (bool, optional): Whether to raise exceptions during evaluation. If `False`, the exceptions will be swallowed and the evaluation will continue - exception count will be reported in the `"errors"` column of the resultant dataframe. Defaults to `False`. silence_warnings (bool, optional): Whether to silence warnings raised. Defaults to `False`. **kwargs (Any): Currently unused. Returns: pd.DataFrame: DataFrame containing the results. The columns of the dataframe contain details about the cross-validation repeats: one column for each :obj:`~tempor.benchmarks.evaluation.OutputMetric`. The index of the dataframe contains all the metrics registered: >>> from tempor import plugin_loader >>> plugin_loader.list(plugin_type="metric")["prediction"]["one_off"]["regression"] [...] """ # For the sake of import modularity, do not use the global plugin loader here, but create own: _plugin_loader = plugins.PluginLoader() metric_plugin_category = "prediction.one_off.regression" supported_metrics = _plugin_loader.list(plugin_type="metric")["prediction"]["one_off"]["regression"] with warnings.catch_warnings(): if silence_warnings: warnings.simplefilter("ignore") if n_splits < 2 or not isinstance(n_splits, int): raise ValueError("n_splits must be an integer >= 2") estimator_ = cast("BaseOneOffRegressor", estimator) enable_reproducibility(random_state) results = _InternalScores() for metric in supported_metrics: results.metrics[metric] = np.zeros(n_splits) splitter = sklearn.model_selection.KFold(n_splits=n_splits, shuffle=True, random_state=random_state) indx = 0 for train_data, test_data in data.split(splitter=splitter): model = copy.deepcopy(estimator_) start = time() try: model.fit(train_data) if TYPE_CHECKING: # pragma: no cover assert test_data.predictive.targets is not None # nosec B101 targets = test_data.predictive.targets.numpy().squeeze() preds = model.predict(test_data).numpy().squeeze() for metric_name in supported_metrics: metric = cast( metric_module.OneOffRegressionMetric, _plugin_loader.get(f"{metric_plugin_category}.{metric_name}", plugin_type="metric"), ) results.metrics[metric_name][indx] = metric.evaluate(targets, preds) results.errors.append(0) except BaseException as e: # pylint: disable=broad-except logger.error(f"Regression evaluation failed: {e}") results.errors.append(1) if raise_exceptions: raise results.durations.append(time() - start) indx += 1 return _postprocess_results(results)
def _prep_data_for_time_to_event_metric( train_data: dataset.TimeToEventAnalysisDataset, test_data: dataset.TimeToEventAnalysisDataset, horizons: data_typing.TimeIndex, predictions: samples.TimeSeriesSamplesBase, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, List[float], np.ndarray, np.ndarray]: """Validate the data for time-to-event metric evaluation and prepare the data in the format expected by the metric evaluation call. """ # Validate the data. if not predictions.num_timesteps_equal(): raise ValueError( f"Expected time to event prediction values for horizons {horizons} all to have equal number of time steps " f"({len(horizons)} but different lengths found {predictions.num_timesteps()}" ) if train_data.predictive.targets is None or test_data.predictive.targets is None: raise ValueError("Expected data to have targets but did not") for data, name in zip( (predictions, train_data.predictive.targets, test_data.predictive.targets), ("predictions", "training data targets", "testing data targets"), ): if data.num_features > 1: raise ValueError( "Currently time to event evaluation only supports one event " f"but more than one event features found in {name}" ) try: float(horizons[0]) # pyright: ignore except (TypeError, ValueError) as e: raise ValueError("Currently only int or float time horizons supported.") from e # Prepare the data. horizons = cast(List[float], horizons) predictions_array = predictions.numpy() t_train, y_train = (df.to_numpy().reshape((-1,)) for df in train_data.predictive.targets.split_as_two_dataframes()) t_test, y_test = (df.to_numpy().reshape((-1,)) for df in test_data.predictive.targets.split_as_two_dataframes()) return y_test, t_test, predictions_array, horizons, y_train, t_train
[docs]@pydantic_utils.validate_arguments(config=pydantic.ConfigDict(arbitrary_types_allowed=True)) def evaluate_time_to_event( # pylint: disable=unused-argument estimator: Any, data: dataset.TimeToEventAnalysisDataset, horizons: data_typing.TimeIndex, n_splits: int = 3, random_state: int = 0, raise_exceptions: bool = False, silence_warnings: bool = False, **kwargs: Any, ) -> pd.DataFrame: """Helper for evaluating time-to-event tasks. Args: estimator (Any): Baseline model to evaluate - must be unfitted data (dataset.TimeToEventAnalysisDataset): The dataset. horizons (data_typing.TimeIndex): Time horizons for making predictions at. n_splits (int, optional): Cross-validation folds. Defaults to ``3``. random_state (int, optional): Random state. Defaults to ``0``. raise_exceptions (bool, optional): Whether to raise exceptions during evaluation. If `False`, the exceptions will be swallowed and the evaluation will continue - exception count will be reported in the `"errors"` column of the resultant dataframe. Defaults to `False`. silence_warnings (bool, optional): Whether to silence warnings raised. Defaults to `False`. **kwargs (Any): Currently unused. Returns: pd.DataFrame: DataFrame containing the results. The columns of the dataframe contain details about the cross-validation repeats: one column for each :obj:`~tempor.benchmarks.evaluation.OutputMetric`. The index of the dataframe contains all the metrics registered: >>> from tempor import plugin_loader >>> plugin_loader.list(plugin_type="metric")["time_to_event"] [...] """ # For the sake of import modularity, do not use the global plugin loader here, but create own: _plugin_loader = plugins.PluginLoader() metric_plugin_category = "time_to_event" supported_metrics = _plugin_loader.list(plugin_type="metric")["time_to_event"] with warnings.catch_warnings(): if silence_warnings: warnings.simplefilter("ignore") # NOTE: xbgse is somehow able to circumvent warnings silencing, so will still raise warnings. if n_splits < 2 or not isinstance(n_splits, int): raise ValueError("n_splits must be an integer >= 2") estimator_ = cast("BaseTimeToEventAnalysis", estimator) enable_reproducibility(random_state) results = _InternalScores() for metric in supported_metrics: results.metrics[metric] = np.zeros(n_splits) splitter = sklearn.model_selection.KFold(n_splits=n_splits, shuffle=True, random_state=random_state) indx = 0 for train_data, test_data in data.split(splitter=splitter): model = copy.deepcopy(estimator_) start = time() try: model.fit(train_data) # targets = test_data.predictive.targets.numpy().squeeze() preds = model.predict(test_data, horizons=horizons) y_test, t_test, predictions_array, horizons, y_train, t_train = _prep_data_for_time_to_event_metric( train_data=train_data, test_data=test_data, horizons=horizons, predictions=preds, ) for metric_name in supported_metrics: metric = cast( metric_module.TimeToEventMetric, _plugin_loader.get(f"{metric_plugin_category}.{metric_name}", plugin_type="metric"), ) metric_per_horizon = metric.evaluate( (y_test, t_test), predictions_array, horizons, (y_train, t_train) ) avg_metric = float(np.asarray(metric_per_horizon).mean()) results.metrics[metric_name][indx] = avg_metric results.errors.append(0) except BaseException as e: # pylint: disable=broad-except logger.error(f"Regression evaluation failed: {e}") results.errors.append(1) if raise_exceptions: raise results.durations.append(time() - start) indx += 1 return _postprocess_results(results)