# mypy: ignore-errors
import math
import os
import tarfile
from pathlib import Path
from typing import List, Optional
import numpy as np
import pandas as pd
import unlzw3
from ..data import Dataset, TimeSeriesSamples
from .dataset_retriever import DatasetRetriever
_DEBUG = False
[docs]class UCIDiabetesRetriever(DatasetRetriever):
dataset_subdir = "uci_diabetes"
dataset_files = [
("https://archive.ics.uci.edu/ml/machine-learning-databases/diabetes/Index", "Index"),
("https://archive.ics.uci.edu/ml/machine-learning-databases/diabetes/README", "README"),
(
"https://archive.ics.uci.edu/ml/machine-learning-databases/diabetes/diabetes-data.tar.Z",
"diabetes-data.tar.Z",
),
]
codes_map = {
33: "regular_insulin_dose",
34: "nph_insulin_dose",
35: "ultralente_insulin_dose",
48: "unspecified_blood_glucose_measurement",
57: "unspecified_blood_glucose_measurement",
58: "pre-breakfast_blood_glucose_measurement",
59: "post-breakfast_blood_glucose_measurement",
60: "pre-lunch_blood_glucose_measurement",
61: "post-lunch_blood_glucose_measurement",
62: "pre-supper_blood_glucose_measurement",
63: "post-supper_blood_glucose_measurement",
64: "pre-snack_blood_glucose_measurement",
65: "hypoglycemic_symptoms",
66: "typical_meal_ingestion",
67: "more-than-usual_meal_ingestion",
68: "less-than-usual_meal_ingestion",
69: "typical_exercise_activity",
70: "more-than-usual_exercise_activity",
71: "less-than-usual_exercise_activity",
72: "unspecified_special_event",
}
features = [
"regular_insulin_dose",
"nph_insulin_dose",
"ultralente_insulin_dose",
"unspecified_blood_glucose_measurement",
"pre-breakfast_blood_glucose_measurement",
"post-breakfast_blood_glucose_measurement",
"pre-lunch_blood_glucose_measurement",
"post-lunch_blood_glucose_measurement",
"pre-supper_blood_glucose_measurement",
"post-supper_blood_glucose_measurement",
"pre-snack_blood_glucose_measurement",
"hypoglycemic_symptoms",
"typical_meal_ingestion",
"more-than-usual_meal_ingestion",
"less-than-usual_meal_ingestion",
"typical_exercise_activity",
"more-than-usual_exercise_activity",
"less-than-usual_exercise_activity",
"unspecified_special_event",
]
_timedelta_for_make_regular = pd.Timedelta(5, "h")
def __init__(
self, data_home: Optional[str] = None, make_regular: bool = False, use_int_index: bool = False
) -> None:
self.make_regular = make_regular
self.use_int_index = use_int_index
super().__init__(data_home)
@property
def dataset_extracted_dir(self):
return os.path.join(self.dataset_dir, "Diabetes-Data")
[docs] def process_individual_file(self, filepath: str) -> pd.DataFrame:
if _DEBUG:
print(f"Processing file: {filepath.split('/')[-1]}")
# Read file.
df = pd.read_csv(filepath, sep="\t", header=None, names=["date", "time", "code", "value"])
# Convert to date-time
df.dropna(subset=["date", "time"], inplace=True)
# ^ Drop rows with of missing date or time, which do exist in dataset.
df_dt = pd.DataFrame(
pd.to_datetime(df["date"] + " " + df["time"], dayfirst=False, errors="coerce"), columns=["datetime"]
)
df_dt.dropna(subset=["datetime"], inplace=True)
# ^ There are a few broken date/time items in the dataset,
# so use "coerce" to get NaTs, then drop those, then use "inner" join below.
df = pd.concat([df_dt, df[["code", "value"]]], axis=1, join="inner")
df.set_index("datetime", drop=True, inplace=True)
# Convert int codes to str feature names.
df.drop(df[~df["code"].isin(self.codes_map.keys())].index, axis=0, inplace=True)
# ^ Drop unknown codes, there are some.
df["code"] = df["code"].map(self.codes_map)
# Make values float
df.loc[df["value"].isin(["0Hi", "0Lo"]), "value"] = np.nan
# ^ NOTE: This is questionable, but unclear what 0Hi/0Lo means
df.loc[df["value"].isin(["0''"]), "value"] = np.nan
# ^ Drop broken value 0''.
df["value"] = df["value"].astype(float)
# Pivot to index by features format. # TODO: May extract this functionality on its own.
# 1. Drop duplicate (index, column) cases (otherwise cannot sensibly pivot)
df["copy_of_index"] = df.index
df.drop_duplicates(subset=["copy_of_index", "code"], keep="last", inplace=True)
df.drop(["copy_of_index"], axis=1)
# 2. Actually pivot.
df_pivoted = df.pivot(columns="code", values="value")
# 3. Enforce that all dataset features are present in each sample.
df_all_features = pd.DataFrame(data=np.nan, columns=self.features, index=df_pivoted.index)
for f in self.features:
if f in df_pivoted:
df_all_features.loc[:, f] = df_pivoted[f]
if _DEBUG:
# Report missing-ness.
total_elements = df_all_features.shape[0] * df_all_features.shape[1]
null_elements = df_all_features.isnull().sum().sum()
print("Non-null elements:", total_elements - null_elements)
print("Missingness fraction:", null_elements / total_elements)
# Sort index ascending.
df_all_features.sort_index(inplace=True)
# Case: make_regular=True.
if self.make_regular:
full_range = df_all_features.index[-1] - df_all_features.index[0] # pyright: ignore
n_periods = math.ceil(full_range / self._timedelta_for_make_regular)
new_index = pd.date_range(
df_all_features.index[0], # pyright: ignore
periods=n_periods + 1,
freq=self._timedelta_for_make_regular,
)
assert new_index[-1] >= df_all_features.index[-1] # pyright: ignore
df_all_features = df_all_features.reindex(new_index, method="nearest")
# Case use_int_index=True.
if self.use_int_index:
df_all_features.reset_index(drop=True, inplace=True)
return df_all_features
@staticmethod
def _get_file_id_range():
return range(1, 70 + 1)
def _get_cache_file_name(self, file_id: int) -> str:
return f"mr{self.make_regular}_uii{self.use_int_index}_{file_id}.pkl"
[docs] def is_cached(self) -> bool:
return all(
[
os.path.exists(os.path.join(self.dataset_cache_dir, self._get_cache_file_name(file_id)))
for file_id in self._get_file_id_range()
]
)
[docs] def get_cache(self) -> Dataset:
list_dfs: List[pd.DataFrame] = []
for file_id in self._get_file_id_range():
cache_path = os.path.join(self.dataset_cache_dir, self._get_cache_file_name(file_id))
list_dfs.append(pd.read_pickle(cache_path))
temporal_covariates = TimeSeriesSamples(data=list_dfs, sample_indices=None)
return Dataset(temporal_covariates)
[docs] def cache(self, data: Dataset) -> None:
temporal_covariates = data.temporal_covariates
for file_id, ts in zip(self._get_file_id_range(), temporal_covariates):
cache_path = os.path.join(self.dataset_cache_dir, self._get_cache_file_name(file_id))
ts.df.to_pickle(cache_path)
[docs] def prepare(self) -> Dataset:
self.extract()
list_dfs: List[pd.DataFrame] = []
for file_id in self._get_file_id_range():
df = self.process_individual_file(os.path.join(self.dataset_extracted_dir, f"data-{file_id:02}"))
list_dfs.append(df)
temporal_covariates = TimeSeriesSamples(data=list_dfs, sample_indices=None)
return Dataset(temporal_covariates)
[docs]def uci_diabetes(
data_home: Optional[str] = None,
refresh_cache: bool = False,
redownload: bool = False,
make_regular: bool = False,
use_int_index: bool = False,
) -> Dataset:
retriever = UCIDiabetesRetriever(data_home=data_home, make_regular=make_regular, use_int_index=use_int_index)
tss = retriever.retrieve(refresh_cache=refresh_cache, redownload=redownload)
return tss