"""A package directory for the pipeline functionality."""
import abc
from typing import Any, Dict, Generator, List, NoReturn, Optional, Tuple, Type
import omegaconf
import rich.pretty
from typing_extensions import Self
from tempor import plugin_loader
from tempor.data import dataset
from tempor.log import logger
from tempor.methods.core.params import Params
from tempor.methods.prediction.one_off.classification import BaseOneOffClassifier
from tempor.methods.prediction.one_off.regression import BaseOneOffRegressor
from tempor.methods.prediction.temporal.classification import BaseTemporalClassifier
from tempor.methods.prediction.temporal.regression import BaseTemporalRegressor
from tempor.methods.time_to_event import BaseTimeToEventAnalysis
from tempor.methods.treatments.one_off import BaseOneOffTreatmentEffects
from tempor.methods.treatments.temporal import BaseTemporalTreatmentEffects
from .generators import (
_generate_constructor,
_generate_fit,
_generate_hyperparameter_space_for_step_impl,
_generate_hyperparameter_space_impl,
_generate_pipeline_seq_impl,
_generate_predict,
_generate_predict_counterfactuals,
_generate_predict_proba,
_generate_sample_hyperparameters_impl,
)
BASE_CLASS_CANDIDATES = (
BaseOneOffClassifier,
BaseOneOffRegressor,
BaseTemporalClassifier,
BaseTemporalRegressor,
BaseTimeToEventAnalysis,
BaseOneOffTreatmentEffects,
BaseTemporalTreatmentEffects,
)
# TODO: Consider allowing transform-only pipelines.
[docs]class PipelineBase:
stages: List
"""A list of method plugin instances corresponding to each step in the pipeline."""
plugin_types: List[Type]
"""A list of types denoting the class of each step in the pipeline."""
def __init__(self, plugin_params: Optional[Dict[str, Dict]] = None, **kwargs: Any) -> None: # pragma: no cover
"""Instantiate the pipeline, (optionally) providing initialization parameters for constituent step plugins.
Note:
The implementations of the methods on this class (``fit``, ``sample_hyperparameters``, etc.) are
auto-generated by the :class:`PipelineMeta` metaclass.
Args:
plugin_params (Optional[Dict[str, Dict]], optional):
A dictionary like ``{"plugin_1_name": {"init_param_1": value, ...}, ...}``. Defaults to `None`.
**kwargs (Any):
Any keyword arguments.
"""
raise NotImplementedError("Not implemented")
[docs] @staticmethod
def pipeline_seq(*args: Any) -> str: # pragma: no cover
"""Get a string representation of the pipeline, stating each stage plugin, e.g. like:
``'preprocessing.imputation.temporal.bfill->...->prediction.one_off.classification.nn_classifier'``
Args:
*args (Any): Any positional arguments.
Returns:
str: String representation of the pipeline.
"""
raise NotImplementedError("Not implemented")
[docs] @staticmethod
def hyperparameter_space(*args: Any, **kwargs: Any) -> Dict[str, List[Params]]: # pragma: no cover
"""The pipeline version of the estimator static method of the same name. All the hyperparameters of the
different stages will be returned.
Args:
*args (Any): Any positional arguments.
**kwargs (Any): Any keyword arguments.
Returns:
Dict[str, List[Params]]: A dictionary with each stage plugin names as keys and corresponding hyperparameter\
space (``List[Params]``) as values.
"""
raise NotImplementedError("Not implemented")
[docs] @staticmethod
def hyperparameter_space_for_step(name: str, *args: Any, **kwargs: Any) -> List[Params]: # pragma: no cover
"""Return the hyperparameter space (``List[Params]``) for the step of the pipeline as specified by ``name``.
Args:
name (str): Name of the pipeline step (i.e. the name of the underlying plugin).
*args (Any): Any positional arguments.
**kwargs (Any): Any keyword arguments.
Returns:
List[Params]: the hyperparameter space for the step of the pipeline.
"""
raise NotImplementedError("Not implemented")
[docs] @classmethod
def sample_hyperparameters(
cls, *args: Any, override: Optional[List[Params]] = None, **kwargs: Any
) -> Dict[str, Any]: # pragma: no cover
"""The pipeline version of the estimator method of the same name. Returns a hyperparameter sample.
Args:
*args (Any): Any positional arguments.
override (Optional[List[Params]], optional): A list of hyperparameter samples to override the default\
sampling. Defaults to `None`.
**kwargs (Any): Any keyword arguments.
Returns:
Dict[str, Any]: a dictionary with hyperparameter names as keys and corresponding hyperparameter samples\
as values.
"""
raise NotImplementedError("Not implemented")
[docs] def fit(self, data: dataset.BaseDataset, *args: Any, **kwargs: Any) -> Self: # pragma: no cover
"""The pipeline version of the estimator ``fit`` method.
By analogy to `sklearn`, under the hood, ``fit_transform`` will be called on all the pipeline steps except
for the last one (the transformer steps of the pipeline), and `fit` will be called on the last step
(the predictive step of the pipeline).
Args:
data (dataset.BaseDataset): Input dataset.
*args (Any): Any positional arguments.
**kwargs (Any): Any keyword arguments.
Returns:
Self: Returns the fitted pipeline itself.
"""
raise NotImplementedError("Not implemented")
[docs] def predict(self, data: dataset.PredictiveDataset, *args: Any, **kwargs: Any) -> Any: # pragma: no cover
"""The pipeline version of the estimator ``predict`` method. Applicable if the final step of the pipeline has
a ``predict`` method implemented.
Args:
data (dataset.PredictiveDataset): Input dataset.
*args (Any): Any positional arguments.
**kwargs (Any): Any keyword arguments.
Returns:
Any: the same return type as the final step of the pipeline.
"""
raise NotImplementedError("Not implemented")
[docs] def predict_proba(self, data: dataset.PredictiveDataset, *args: Any, **kwargs: Any) -> Any: # pragma: no cover
"""The pipeline version of the estimator ``predict_proba`` method. Applicable if the final step of the pipeline
has a ``predict_proba`` method implemented.
Args:
data (dataset.PredictiveDataset): Input dataset.
*args (Any): Any positional arguments.
**kwargs (Any): Any keyword arguments.
Returns:
Any: the same return type as the final step of the pipeline.
"""
raise NotImplementedError("Not implemented")
[docs] def predict_counterfactuals(
self, data: dataset.PredictiveDataset, *args: Any, **kwargs: Any
) -> Any: # pragma: no cover
"""The pipeline version of the estimator ``predict_counterfactuals`` method. Applicable if the final step of
the pipeline has a ``predict_counterfactuals`` method implemented.
Args:
data (dataset.PredictiveDataset): Input dataset.
*args (Any): Any positional arguments.
**kwargs (Any): Any keyword arguments.
Returns:
Any: the same return type as the final step of the pipeline.
"""
raise NotImplementedError("Not implemented")
@property
def predictor_category(self) -> str:
"""Return the ``category`` of the final step of the pipeline.
Returns:
str: The ``category`` of the final step of the pipeline.
"""
return self.plugin_types[-1].category
@property
def params(self) -> Dict[str, omegaconf.DictConfig]:
"""Return a dictionary with the parameters (``.params``) of each step of the pipeline (keyed by the stage name).
Returns:
Dict[str, omegaconf.DictConfig]: The requisite dictionary.
"""
out = dict()
for p in self.stages:
out[p.name] = p.params
return out
def __rich_repr__(self) -> Generator:
"""A `rich` representation of the class.
Yields:
Generator: The fields and their values fed to `rich`.
"""
yield "pipeline_seq", self.pipeline_seq()
yield "predictor_category", self.predictor_category
yield "params", {k: omegaconf.OmegaConf.to_container(v) for k, v in self.params.items()}
def __repr__(self) -> str:
"""The `repr()` representation of the class.
Returns:
str: The representation.
"""
return rich.pretty.pretty_repr(self)
[docs]def prepend_base(base: Type, bases: List[Type]) -> List[Type]:
"""Prepend a base class to a list of base classes, if it is not already present.
Args:
base (Type): Base class to prepend.
bases (List[Type]): List of base classes.
Returns:
List[Type]: The list of base classes, with the base class prepended.
"""
if base in bases:
bases_final = bases
else:
bases_final = [base] + bases
return bases_final
[docs]def raise_not_implemented(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise a ``NotImplementedError`` if a method like ``_fit/predict/...`` is not implemented.
Args:
*args (Any): Any positional arguments.
**kwargs (Any): Any keyword arguments.
Raises:
NotImplementedError: The requisite error.
Returns:
NoReturn: Does not return, raises error.
"""
raise NotImplementedError("The `{_fit/predict/...}` methods are not implemented for the pipelines")
[docs]def pipeline_classes(names: List[str]) -> Tuple[Type, ...]:
"""Return a list sequence of method plugin classes based on a sequence of fully-qualified ``names`` provided.
Args:
names (List[str]): A sequence of fully-qualified names of method plugins, corresponding to pipeline steps.
Returns:
Tuple[Type, ...]: The corresponding sequence of method plugin classes.
"""
res = []
for fqn in names:
if "." not in fqn:
raise RuntimeError(f"Invalid fqn: {fqn}")
res.append(plugin_loader.get_class(fqn))
return tuple(res)
[docs]def pipeline(plugins_str: List[str]) -> Type[PipelineBase]:
"""Use this method to create pipelines.
Generates a pipeline (:class:`PipelineBase`) class with an implementation of the necessary methods
(``fit``, ``sample_hyperparameters`` etc.), based on a sequence of steps defined by ``plugins_str``.
All but the last steps must be data transformer plugins, and the last step must be a predictive method plugin.
This method will return a pipeline class (``Type[PipelineBase]``), which should be instantiated. At time of
instantiation, ``__init__`` input parameters for each step's method plugin can be provided. See
:class:`PipelineBase` for details.
Args:
plugins_str (List[str]):
A sequence of method plugins' fully-qualified names (e.g.
``"prediction.one_off.classification.nn_classifier"``).
Returns:
Type[PipelineBase]: The pipeline class (not instance) is returned.
"""
plugins = pipeline_classes(plugins_str)
class Pipeline(metaclass=PipelineMeta, plugins=plugins):
pass
return Pipeline # type: ignore