Source code for tempor.data.samples_experimental

"""Module with experimental samples implementations."""

# pyright: reportPrivateImportUsage=false

from typing import TYPE_CHECKING, Any, List, Optional, Tuple

import dask.dataframe as dd
import numpy as np
import pandas as pd
from typing_extensions import Self

from tempor.core import plugins
from tempor.log import logger

from . import data_typing, samples, utils
from .settings import DATA_SETTINGS

# NOTE: Dask samples implementations are a work in progress and do not yet fully leverage Dask capabilities.


def _process_npartitions_chunksize(**kwargs: Any) -> Tuple[Optional[int], Optional[int], Any]:
    if "npartitions" in kwargs:
        npartitions = kwargs.pop("npartitions")
        chunksize = None
    elif "chunksize" in kwargs:
        chunksize = kwargs.pop("chunksize")
        npartitions = None
    else:
        npartitions = 1
        chunksize = None
    return npartitions, chunksize, kwargs


[docs]@plugins.register_plugin(name="static_samples_dask", category="static_samples", plugin_type="dataformat") class StaticSamplesDask(samples.StaticSamplesBase): _data: dd.DataFrame # type: ignore[name-defined] # _schema: pa.DataFrameSchema def __init__( self, data: data_typing.DataContainer, **kwargs: Any, ) -> None: """Create a :class:`StaticSamplesDask` object from the ``data``. Args: data (data_typing.DataContainer): A container with the data. **kwargs (Any): Any additional keyword arguments to pass to the constructor. """ npartitions, chunksize, kwargs = _process_npartitions_chunksize(**kwargs) if isinstance(data, dd.DataFrame): # type: ignore[attr-defined] self._data = data elif isinstance(data, pd.DataFrame): self._data = dd.from_pandas( # type: ignore[attr-defined] data, npartitions=npartitions, chunksize=chunksize, ) elif isinstance(data, np.ndarray): raise NotImplementedError("`StaticSamplesDask` does not support `numpy.ndarray` input yet.") else: # pragma: no cover # Prevented by pydantic check. raise ValueError(f"Data object {type(data)} not supported") super().__init__(data, **kwargs) def _validate(self) -> None: # TODO: Validation analogous to the DF implementation. logger.info("Validation not yet implemented for Dask data format. Data format consistency is not guaranteed.")
[docs] @staticmethod def from_dataframe( dataframe: pd.DataFrame, **kwargs: Any, ) -> "StaticSamplesDask": # pyright: ignore """Create :class:`StaticSamplesDask` from `pandas.DataFrame`. The rows represent samples, the columns represent features. Args: dataframe (pd.DataFrame): The dataframe that represents the data. **kwargs (Any): Any additional keyword arguments to pass to the constructor. Returns: StaticSamplesDask: :class:`StaticSamples` object from ``dataframe``. """ return StaticSamplesDask(dataframe, **kwargs)
[docs] @staticmethod def from_numpy( array: np.ndarray, *, sample_index: Optional[data_typing.SampleIndex] = None, feature_index: Optional[data_typing.FeatureIndex] = None, **kwargs: Any, ) -> "StaticSamplesDask": # pyright: ignore """Not implemented yet.""" raise NotImplementedError("`StaticSamplesDask` does not support `numpy.ndarray` input yet.")
[docs] def numpy(self, **kwargs: Any) -> np.ndarray: """Return the data as a `numpy.ndarray`. Args: **kwargs (Any): Any additional keyword arguments. Currently unused. Returns: np.ndarray: The `numpy.ndarray`. """ return self._data.compute().to_numpy()
[docs] def dataframe(self, **kwargs: Any) -> pd.DataFrame: """Return the data as a `pandas.DataFrame`. Args: **kwargs (Any): Any additional keyword arguments. Currently unused. Returns: pd.DataFrame: The dataframe. """ return self._data.compute()
[docs] def sample_index(self) -> data_typing.SampleIndex: """Return a list representing sample indexes. Returns: data_typing.SampleIndex: Sample indexes. """ return list(self._data.index.compute())
@property def num_samples(self) -> int: """Return number of samples. Returns: int: Number of samples. """ return self._data.shape[0].compute() @property def num_features(self) -> int: """Return number of features. Returns: int: Number of features. """ return self._data.shape[1]
[docs] def short_repr(self) -> str: """A short string representation of the object. Returns: str: The short representation. """ return f"{self.__class__.__name__}([{self.num_samples}, {self.num_features}])"
def __getitem__(self, key: data_typing.GetItemKey) -> Self: """Return a new subset :class:`StaticSamples` object with the data indexed by the ``key``. Args: key (data_typing.GetItemKey): The key to index the data by. Returns: Self: A new subset :class:`StaticSamples` object. """ key_ = utils.ensure_pd_iloc_key_returns_df(key) return StaticSamplesDask( # type: ignore [return-value] self._data.compute().iloc[key_, :], # pyright: ignore _skip_validate=True, )
[docs]def multiindex_df_to_compatible_ddf( df: pd.DataFrame, **kwargs: Any, ) -> dd.DataFrame: # type: ignore[name-defined] """Convert a multiindex dataframe to a dask dataframe with a single tuple index.""" compatible_df = pd.DataFrame(data=df.to_numpy(), index=df.index.to_list(), columns=df.columns.to_list()) return dd.from_pandas( # type: ignore[attr-defined] compatible_df, **kwargs, )
[docs]def compatible_ddf_to_multiindex_df( ddf: dd.DataFrame, # type: ignore[name-defined] ) -> pd.DataFrame: """Convert a dask dataframe with a single tuple index to a multiindex dataframe.""" asdf = ddf.compute() df = pd.DataFrame(data=asdf.to_numpy(), columns=asdf.columns.to_list()) df.index = pd.MultiIndex.from_tuples(asdf.index.to_list()) return df
[docs]@plugins.register_plugin(name="time_series_samples_dask", category="time_series_samples", plugin_type="dataformat") class TimeSeriesSamplesDask(samples.TimeSeriesSamplesBase): _data: dd.DataFrame # type: ignore[name-defined] # _schema: pa.DataFrameSchema def __init__( self, data: data_typing.DataContainer, **kwargs: Any, ) -> None: """Create an :class:`TimeSeriesSamplesDask` object from the ``data``. Args: data (data_typing.DataContainer): A container with the data. **kwargs (Any): Any additional keyword arguments to pass to the constructor. """ npartitions, chunksize, kwargs = _process_npartitions_chunksize(**kwargs) if isinstance(data, dd.DataFrame): # type: ignore[attr-defined] self._data = data elif isinstance(data, pd.DataFrame): self._data = multiindex_df_to_compatible_ddf(data, npartitions=npartitions, chunksize=chunksize) elif isinstance(data, np.ndarray): raise NotImplementedError("`TimeSeriesSamples` does not support `numpy.ndarray` input yet.") else: # pragma: no cover # Prevented by pydantic check. raise ValueError(f"Data object {type(data)} not supported") super().__init__(data, **kwargs) def _validate(self) -> None: # TODO: Validation analogous to the DF implementation. logger.info("Validation not yet implemented for Dask data format. Data format consistency is not guaranteed.")
[docs] @staticmethod def from_dataframe( dataframe: pd.DataFrame, **kwargs: Any, ) -> "TimeSeriesSamplesDask": # pyright: ignore """Create :class:`TimeSeriesSamplesDask` from `pandas.DataFrame`. This row index of the dataframe should be a 2-level multiindex (sample, timestep). The columns should be the features. Args: dataframe (pd.DataFrame): The dataframe that contains the data. **kwargs (Any): Any additional keyword arguments to pass to the constructor. Returns: TimeSeriesSamplesDask: The :class:`TimeSeriesSamples` object created from the ``dataframe``. """ return TimeSeriesSamplesDask(dataframe, **kwargs)
[docs] @staticmethod def from_numpy( array: np.ndarray, **kwargs: Any, ) -> "TimeSeriesSamplesDask": # pyright: ignore """Not implemented yet.""" raise NotImplementedError("`TimeSeriesSamples` does not support `numpy.ndarray` input yet.")
[docs] def numpy(self, *, padding_indicator: Any = DATA_SETTINGS.default_padding_indicator, **kwargs: Any) -> np.ndarray: """Return the data as a `numpy.ndarray`. Args: padding_indicator (Any, optional): Padding indicator value. Defaults to `DATA_SETTINGS.default_padding_indicator`. **kwargs (Any): Any additional keyword arguments. Currently unused. Returns: np.ndarray: The `numpy.ndarray`. """ return utils.multiindex_timeseries_dataframe_to_array3d( df=compatible_ddf_to_multiindex_df(self._data), padding_indicator=padding_indicator, max_timesteps=None )
[docs] def dataframe(self, **kwargs: Any) -> pd.DataFrame: """Return the data as a `pandas.DataFrame`. Args: **kwargs (Any): Any additional keyword arguments. Currently unused. Returns: pd.DataFrame: The `pandas.DataFrame`. """ return compatible_ddf_to_multiindex_df(self._data)
[docs] def sample_index(self) -> data_typing.SampleIndex: """Get a list containing sample indexes. Returns: data_typing.SampleIndex: A list containing sample indexes. """ return list(utils.get_df_index_level0_unique(compatible_ddf_to_multiindex_df(self._data))) # pyright: ignore
[docs] def time_indexes(self) -> data_typing.TimeIndexList: """Get a list containing time indexes for each sample. Each time index is represented as a list of time step elements. Returns: data_typing.TimeIndexList: A list containing time indexes for each sample. """ return list(self.time_indexes_as_dict().values()) # pyright: ignore
[docs] def time_indexes_as_dict(self) -> data_typing.SampleToTimeIndexDict: """Get a dictionary mapping each sample index to its time index. Time index is represented as a list of time step elements. Returns: data_typing.SampleToTimeIndexDict: The dictionary mapping each sample index to its time index. """ multiindex = compatible_ddf_to_multiindex_df(self._data).index if TYPE_CHECKING: # pragma: no cover assert isinstance(multiindex, pd.MultiIndex) # nosec B101 sample_index = self.sample_index() d = dict() for s in sample_index: time_index_locs = multiindex.get_locs([s, slice(None)]) d[s] = list(multiindex.get_level_values(1)[time_index_locs]) return d # type: ignore[return-value]
[docs] def time_indexes_float(self) -> List[np.ndarray]: """Return time indexes but converting their elements to `float` values. Date-time time index will be converted using :obj:`~tempor.data.utils.datetime_time_index_to_float`. Returns: List[np.ndarray]: List of 1D `numpy.ndarray` s of `float` values, corresponding to the time index. """ return [utils.datetime_time_index_to_float(ti) for ti in self.time_indexes()]
[docs] def num_timesteps(self) -> List[int]: """Get the number of timesteps for each sample. Returns: List[int]: List containing the number of timesteps for each sample. """ return [len(x) for x in self.time_indexes()]
[docs] def num_timesteps_as_dict(self) -> data_typing.SampleToNumTimestepsDict: """Get a dictionary mapping each sample index to its the number of timesteps. Returns: data_typing.SampleToNumTimestepsDict: List containing the number of timesteps for each sample. """ return {key: len(x) for key, x in self.time_indexes_as_dict().items()} # type: ignore
[docs] def num_timesteps_equal(self) -> bool: """Returns `True` if all samples share the same number of timesteps, `False` otherwise. Returns: bool: whether all samples share the same number of timesteps. """ timesteps = self.num_timesteps() return True if len(timesteps) == 0 else all([x == timesteps[0] for x in timesteps])
[docs] def list_of_dataframes(self) -> List[pd.DataFrame]: """Returns a list of dataframes where each dataframe has the data for each sample. Returns: List[pd.DataFrame]: List of dataframes for each sample. """ return utils.multiindex_timeseries_dataframe_to_list_of_dataframes(compatible_ddf_to_multiindex_df(self._data))
@property def num_samples(self) -> int: """Return number of samples. Returns: int: Number of samples. """ sample_ids = utils.get_df_index_level0_unique(compatible_ddf_to_multiindex_df(self._data)) return len(sample_ids) @property def num_features(self) -> int: """Return number of features. Returns: int: Number of features. """ return self._data.shape[1]
[docs] def short_repr(self) -> str: """A short string representation of the object. Returns: str: The short representation. """ return f"{self.__class__.__name__}([{self.num_samples}, *, {self.num_features}])"
def __getitem__(self, key: data_typing.GetItemKey) -> Self: """Return a subset :class:`TimeSeriesSamples` object with the data indexed by the ``key``. Args: key (data_typing.GetItemKey): The key to index the data by. Returns: Self: A new subset :class:`TimeSeriesSamples` object. """ key_ = utils.ensure_pd_iloc_key_returns_df(key) sample_index = utils.get_df_index_level0_unique(compatible_ddf_to_multiindex_df(self._data)) selected = list(sample_index[key_]) # pyright: ignore return TimeSeriesSamplesDask( # type: ignore [return-value] compatible_ddf_to_multiindex_df(self._data).loc[(selected, slice(None)), :], # pyright: ignore _skip_validate=True, )
[docs]@plugins.register_plugin(name="event_samples_dask", category="event_samples", plugin_type="dataformat") class EventSamplesDask(samples.EventSamplesBase): _data: dd.DataFrame # type: ignore[name-defined] # _schema: pa.DataFrameSchema # _schema_split: pa.DataFrameSchema def __init__( self, data: data_typing.DataContainer, **kwargs: Any, ) -> None: """Create an :class:`EventSamplesDask` object from the ``data``. Args: data (data_typing.DataContainer): A container with the data. **kwargs (Any): Any additional keyword arguments to pass to the constructor. """ npartitions, chunksize, kwargs = _process_npartitions_chunksize(**kwargs) if isinstance(data, dd.DataFrame): # type: ignore[attr-defined] self._data = data elif isinstance(data, pd.DataFrame): self._data = dd.from_pandas( # type: ignore[attr-defined] data, npartitions=npartitions, chunksize=chunksize, ) elif isinstance(data, np.ndarray): raise NotImplementedError("`EventSamplesDask` does not support `numpy.ndarray` input yet.") else: # pragma: no cover # Prevented by pydantic check. raise ValueError(f"Data object {type(data)} not supported") super().__init__(data, **kwargs) def _validate(self) -> None: # TODO: Validation analogous to the DF implementation. logger.info("Validation not yet implemented for Dask data format. Data format consistency is not guaranteed.")
[docs] @staticmethod def from_dataframe( dataframe: pd.DataFrame, **kwargs: Any, ) -> "EventSamplesDask": # pyright: ignore """Create :class:`EventSamples` from `pandas.DataFrame`. The row index of the dataframe should be the sample indexes. The columns should be the features. Each feature should contain a tuple of ``(time, value)`` representing the event. Args: dataframe (pd.DataFrame): The dataframe that contains the data. **kwargs (Any): Any additional keyword arguments to pass to the constructor. Returns: EventSamplesDask: The :class:`EventSamplesDask` object created from the ``dataframe``. """ return EventSamplesDask(dataframe, **kwargs)
[docs] @staticmethod def from_numpy( array: np.ndarray, **kwargs: Any, ) -> "EventSamplesDask": # pyright: ignore """Not implemented yet.""" raise NotImplementedError("`EventSamplesDask` does not support `numpy.ndarray` input yet.")
[docs] def numpy(self, **kwargs: Any) -> np.ndarray: """Return the data as a `numpy.ndarray`. Args: **kwargs (Any): Any additional keyword arguments. Currently unused. Returns: np.ndarray: The `numpy.ndarray`. """ return self._data.compute().to_numpy()
[docs] def dataframe(self, **kwargs: Any) -> pd.DataFrame: """Return the data as a `pandas.DataFrame`. Args: **kwargs (Any): Any additional keyword arguments. Currently unused. Returns: pd.DataFrame: The `pandas.DataFrame`. """ return self._data.compute()
[docs] def sample_index(self) -> data_typing.SampleIndex: """Return a list representing sample indexes. Returns: data_typing.SampleIndex: Sample indexes. """ return list(self._data.index.compute())
@property def num_samples(self) -> int: """Return number of samples. Returns: int: Number of samples. """ return self._data.shape[0].compute() @property def num_features(self) -> int: """Return number of features. Returns: int: Number of features. """ return self._data.shape[1]
[docs] def split( self, time_feature_suffix: str = samples._DEFAULT_EVENTS_TIME_FEATURE_SUFFIX, # pylint: disable=protected-access ) -> pd.DataFrame: """Return a `pandas.DataFrame` where the time component of each event feature has been split off to its own column. The new columns that contain the times will be named ``"<original column name><time_feature_suffix>"`` and will be inserted before each corresponding ``<original column name>`` column. The ``<original column name>`` columns will contain only the event value. Args: time_feature_suffix (str, optional): A column name suffix string to identify the time columns that will be split off. Defaults to ``"_time"``. Returns: pd.DataFrame: The output dataframe. """ df = self._data.compute().copy() features = list(df.columns) if any(time_feature_suffix in str(c) for c in features): # pragma: no cover raise ValueError(f"Column names must not contain '{time_feature_suffix}'") for f_idx, f in enumerate(features): df.insert(f_idx * 2, f"{f}{time_feature_suffix}", df[f].apply(lambda x: x[0])) for f in features: df[f] = df[f].apply(lambda x: x[1]) return df
[docs] def split_as_two_dataframes( self, time_feature_suffix: str = samples._DEFAULT_EVENTS_TIME_FEATURE_SUFFIX, # pylint: disable=protected-access ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Analogous to :func:`~tempor.data.samples.EventSamples.split` but returns two `pandas.DataFrame` s: - first dataframe contains the event times of each feature. - second dataframe contains the event values (`True`/`False`) of each feature. Args: time_feature_suffix (str, optional): A column name suffix string to identify the time columns that will be split off. Defaults to ``"_time"``. Returns: Tuple[pd.DataFrame, pd.DataFrame]: Two `pandas.DataFrame` s containing event times and values respectively. """ df_split = self.split(time_feature_suffix=time_feature_suffix) df_event_times = df_split.loc[:, [c for c in df_split.columns if time_feature_suffix in c]] df_event_values = df_split.loc[:, [c for c in df_split.columns if time_feature_suffix not in c]] return df_event_times, df_event_values
[docs] def short_repr(self) -> str: """A short string representation of the object. Returns: str: The short representation. """ return f"{self.__class__.__name__}([{self.num_samples}, {self.num_features}])"
def __getitem__(self, key: data_typing.GetItemKey) -> Self: """Return a new subset :class:`EventSamples` object with the data indexed by the ``key``. Args: key (data_typing.GetItemKey): The key to index the data by. Returns: Self: A new subset :class:`EventSamples` object. """ key_ = utils.ensure_pd_iloc_key_returns_df(key) return EventSamplesDask( # type: ignore [return-value] self._data.compute().iloc[key_, :], # pyright: ignore _skip_validate=True, )