#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""Representations for different kinds of datasets."""
from __future__ import annotations
import copy
from typing import Any
import torch
from botorch.exceptions.errors import InputDataError, UnsupportedError
from botorch.utils.containers import BotorchContainer, SliceContainer
from pyre_extensions import none_throws
from torch import long, ones, Tensor
[docs]
class SupervisedDataset:
r"""Base class for datasets consisting of labelled pairs `(X, Y)`
and an optional `Yvar` that stipulates observations variances so
that `Y[i] ~ N(f(X[i]), Yvar[i])`.
Example:
.. code-block:: python
X = torch.rand(16, 2)
Y = torch.rand(16, 1)
feature_names = ["learning_rate", "embedding_dim"]
outcome_names = ["neg training loss"]
A = SupervisedDataset(
X=X,
Y=Y,
feature_names=feature_names,
outcome_names=outcome_names,
)
B = SupervisedDataset(
X=DenseContainer(X, event_shape=X.shape[-1:]),
Y=DenseContainer(Y, event_shape=Y.shape[-1:]),
feature_names=feature_names,
outcome_names=outcome_names,
)
assert A == B
"""
def __init__(
self,
X: BotorchContainer | Tensor,
Y: BotorchContainer | Tensor,
*,
feature_names: list[str],
outcome_names: list[str],
Yvar: BotorchContainer | Tensor | None = None,
validate_init: bool = True,
group_indices: Tensor | None = None,
) -> None:
r"""Constructs a `SupervisedDataset`.
Args:
X: A `Tensor` or `BotorchContainer` representing the input features.
Y: A `Tensor` or `BotorchContainer` representing the outcomes.
feature_names: A list of names of the features in `X`.
outcome_names: A list of names of the outcomes in `Y`.
Yvar: An optional `Tensor` or `BotorchContainer` representing
the observation noise.
validate_init: If `True`, validates the input shapes.
group_indices: A `Tensor` representing the which rows of X and Y are
grouped together. This is used to support applications in which multiple
observations should be considered as a group, e.g., learning-curve-based
modeling. If provided, its shape must be compatible with X and Y.
"""
self._X = X
self._Y = Y
self._Yvar = Yvar
self.feature_names = feature_names
self.outcome_names = outcome_names
self.group_indices = group_indices
self.validate_init = validate_init
if validate_init:
self._validate()
@property
def X(self) -> Tensor:
if isinstance(self._X, Tensor):
return self._X
return self._X()
@property
def Y(self) -> Tensor:
if isinstance(self._Y, Tensor):
return self._Y
return self._Y()
@property
def Yvar(self) -> Tensor | None:
if self._Yvar is None or isinstance(self._Yvar, Tensor):
return self._Yvar
return self._Yvar()
def _validate(
self,
validate_feature_names: bool = True,
validate_outcome_names: bool = True,
validate_group_indices: bool = True,
) -> None:
r"""Checks that the shapes of the inputs are compatible with each other.
Args:
validate_feature_names: By default, we validate that the length of
`feature_names` matches the # of columns of `self.X`. If a
particular dataset, e.g., `RankingDataset`, is known to violate
this assumption, this can be set to `False`.
validate_outcome_names: By default, we validate that the length of
`outcomes_names` matches the # of columns of `self.Y`. If a
particular dataset, e.g., `RankingDataset`, is known to violate
this assumption, this can be set to `False`.
validate_group_indices: By default, we validate that the shape of
`group_indices` matches the shape of X and Y.
"""
shape_X = self.X.shape
if isinstance(self._X, BotorchContainer):
shape_X = shape_X[: len(shape_X) - len(self._X.event_shape)]
else:
shape_X = shape_X[:-1]
shape_Y = self.Y.shape
if isinstance(self._Y, BotorchContainer):
shape_Y = shape_Y[: len(shape_Y) - len(self._Y.event_shape)]
else:
shape_Y = shape_Y[:-1]
if shape_X != shape_Y:
raise ValueError("Batch dimensions of `X` and `Y` are incompatible.")
if self.Yvar is not None and self.Yvar.shape != self.Y.shape:
raise ValueError("Shapes of `Y` and `Yvar` are incompatible.")
if validate_feature_names and len(self.feature_names) != self.X.shape[-1]:
raise ValueError(
"`X` must have the same number of columns as the number of "
"features in `feature_names`."
)
if validate_outcome_names and len(self.outcome_names) != self.Y.shape[-1]:
raise ValueError(
"`Y` must have the same number of columns as the number of "
"outcomes in `outcome_names`."
)
if validate_group_indices and self.group_indices is not None:
if self.group_indices.shape != shape_X:
raise ValueError(
f"shape_X ({shape_X}) must have the same shape as "
f"group_indices ({none_throws(self.group_indices).shape})."
)
def __eq__(self, other: Any) -> bool:
if self.group_indices is None and other.group_indices is None:
group_indices_equal = True
elif self.group_indices is None or other.group_indices is None:
group_indices_equal = False
else:
group_indices_equal = torch.equal(self.group_indices, other.group_indices)
return (
type(other) is type(self)
and torch.equal(self.X, other.X)
and torch.equal(self.Y, other.Y)
and (
other.Yvar is None
if self.Yvar is None
else other.Yvar is not None and torch.equal(self.Yvar, other.Yvar)
)
and self.feature_names == other.feature_names
and self.outcome_names == other.outcome_names
and group_indices_equal
)
[docs]
def clone(
self, deepcopy: bool = False, mask: Tensor | None = None
) -> SupervisedDataset:
"""Return a copy of the dataset.
Args:
deepcopy: If True, perform a deep copy. Otherwise, use the same
tensors/lists.
mask: A `n`-dim boolean mask indicating which rows to keep. This is used
along the -2 dimension.
Returns:
The new dataset.
"""
new_X = self._X
new_Y = self._Y
new_Yvar = self._Yvar
feature_names = self.feature_names
outcome_names = self.outcome_names
if mask is not None:
if any(isinstance(x, BotorchContainer) for x in [new_X, new_Y, new_Yvar]):
raise NotImplementedError(
"Masking is not supported for BotorchContainers."
)
new_X = new_X[..., mask, :]
new_Y = new_Y[..., mask, :]
if new_Yvar is not None:
new_Yvar = new_Yvar[..., mask, :]
if deepcopy:
new_X = new_X.clone()
new_Y = new_Y.clone()
new_Yvar = new_Yvar.clone() if new_Yvar is not None else None
feature_names = copy.copy(self.feature_names)
outcome_names = copy.copy(self.outcome_names)
kwargs = {}
if new_Yvar is not None:
kwargs = {"Yvar": new_Yvar}
return type(self)(
X=new_X,
Y=new_Y,
feature_names=feature_names,
outcome_names=outcome_names,
validate_init=self.validate_init,
**kwargs,
)
[docs]
class RankingDataset(SupervisedDataset):
r"""A SupervisedDataset whose labelled pairs `(x, y)` consist of m-ary combinations
`x ∈ Z^{m}` of elements from a ground set `Z = (z_1, ...)` and ranking vectors
`y {0, ..., m - 1}^{m}` with properties:
a) Ranks start at zero, i.e. min(y) = 0.
b) Sorted ranks are contiguous unless one or more ties are present.
c) `k` ranks are skipped after a `k`-way tie.
Example:
.. code-block:: python
X = SliceContainer(
values=torch.rand(16, 2),
indices=torch.stack([torch.randperm(16)[:3] for _ in range(8)]),
event_shape=torch.Size([3 * 2]),
)
Y = DenseContainer(
torch.stack([torch.randperm(3) for _ in range(8)]),
event_shape=torch.Size([3])
)
feature_names = ["item_0", "item_1"]
outcome_names = ["ranking outcome"]
dataset = RankingDataset(
X=X,
Y=Y,
feature_names=feature_names,
outcome_names=outcome_names,
)
"""
def __init__(
self,
X: SliceContainer,
Y: BotorchContainer | Tensor,
feature_names: list[str],
outcome_names: list[str],
validate_init: bool = True,
) -> None:
r"""Construct a `RankingDataset`.
Args:
X: A `SliceContainer` representing the input features being ranked.
Y: A `Tensor` or `BotorchContainer` representing the rankings.
feature_names: A list of names of the features in X.
outcome_names: A list of names of the outcomes in Y.
validate_init: If `True`, validates the input shapes.
"""
super().__init__(
X=X,
Y=Y,
feature_names=feature_names,
outcome_names=outcome_names,
Yvar=None,
validate_init=validate_init,
)
def _validate(self) -> None:
super()._validate(
validate_feature_names=False,
validate_outcome_names=False,
validate_group_indices=False,
)
if len(self.feature_names) != self._X.values.shape[-1]:
raise ValueError(
"The `values` field of `X` must have the same number of columns as "
"the number of features in `feature_names`."
)
Y = self.Y
arity = self._X.indices.shape[-1]
if Y.min() < 0 or Y.max() >= arity:
raise ValueError("Invalid ranking(s): out-of-bounds ranks detected.")
# Ensure that rankings are well-defined
Y_sort = Y.sort(descending=False, dim=-1).values
y_incr = ones([], dtype=long)
y_prev = None
for i, y in enumerate(Y_sort.unbind(dim=-1)):
if i == 0:
if (y != 0).any():
raise ValueError("Invalid ranking(s): missing zero-th rank.")
y_prev = y
continue
y_diff = y - y_prev
y_prev = y
# Either a tie or next ranking when accounting for previous ties
if not ((y_diff == 0) | (y_diff == y_incr)).all():
raise ValueError("Invalid ranking(s): ranks not skipped after ties.")
# Same as: torch.where(y_diff == 0, y_incr + 1, 1)
y_incr = y_incr - y_diff + 1
[docs]
class MultiTaskDataset(SupervisedDataset):
"""This is a multi-task dataset that is constructed from the datasets of
individual tasks. It offers functionality to combine parts of individual
datasets to construct the inputs necessary for the `MultiTaskGP` models.
The datasets of individual tasks are allowed to represent different sets
of features. When there are heterogeneous feature sets, calling
`MultiTaskDataset.X` will result in an error.
"""
def __init__(
self,
datasets: list[SupervisedDataset],
target_outcome_name: str,
task_feature_index: int | None = None,
):
"""Construct a `MultiTaskDataset`.
Args:
datasets: A list of the datasets of individual tasks. Each dataset
is expected to contain data for only one outcome.
target_outcome_name: Name of the target outcome to be modeled.
task_feature_index: If the task feature is included in the Xs of the
individual datasets, this should be used to specify its index.
If omitted, the task feature will be appended while concatenating Xs.
If given, we sanity-check that the names of the task features
match between all datasets.
"""
self.datasets: dict[str, SupervisedDataset] = {
ds.outcome_names[0]: ds for ds in datasets
}
self.target_outcome_name = target_outcome_name
self.task_feature_index = task_feature_index
self._validate_datasets(datasets=datasets)
self.feature_names = self.datasets[target_outcome_name].feature_names
self.outcome_names = [target_outcome_name]
# Check if the datasets have identical feature sets.
self.has_heterogeneous_features = any(
datasets[0].feature_names != ds.feature_names for ds in datasets[1:]
)
self.group_indices = None
[docs]
@classmethod
def from_joint_dataset(
cls,
dataset: SupervisedDataset,
task_feature_index: int,
target_task_value: int,
outcome_names_per_task: dict[int, str] | None = None,
) -> MultiTaskDataset:
r"""Construct a `MultiTaskDataset` from a joint dataset that includes the
data for all tasks with the task feature index.
This will break down the joint dataset into individual datasets by the value
of the task feature. Each resulting dataset will have its outcome name set
based on `outcome_names_per_task`, with the missing values defaulting to
`task_<task_feature>` (except for the target task, which will retain the
original outcome name from the dataset).
Args:
dataset: The joint dataset.
task_feature_index: The column index of the task feature in `dataset.X`.
target_task_value: The value of the task feature for the target task
in the dataset. The data for the target task is filtered according to
`dataset.X[task_feature_index] == target_task_value`.
outcome_names_per_task: Optional dictionary mapping task feature values
to the outcome names for each task. If not provided, the auxiliary
tasks will be named `task_<task_feature>` and the target task will
retain the outcome name from the dataset.
Returns:
A `MultiTaskDataset` instance.
"""
if len(dataset.outcome_names) > 1:
raise UnsupportedError(
"Dataset containing more than one outcome is not supported. "
f"Got {dataset.outcome_names=}."
)
outcome_names_per_task = outcome_names_per_task or {}
# Split datasets by task feature.
datasets = []
all_task_features = dataset.X[:, task_feature_index]
for task_value in all_task_features.unique().long().tolist():
default_name = (
dataset.outcome_names[0]
if task_value == target_task_value
else f"task_{task_value}"
)
outcome_name = outcome_names_per_task.get(task_value, default_name)
filter_mask = all_task_features == task_value
new_dataset = SupervisedDataset(
X=dataset.X[filter_mask],
Y=dataset.Y[filter_mask],
Yvar=dataset.Yvar[filter_mask] if dataset.Yvar is not None else None,
feature_names=dataset.feature_names,
outcome_names=[outcome_name],
)
datasets.append(new_dataset)
# Return the new dataset
return cls(
datasets=datasets,
target_outcome_name=outcome_names_per_task.get(
target_task_value, dataset.outcome_names[0]
),
task_feature_index=task_feature_index,
)
def _validate_datasets(self, datasets: list[SupervisedDataset]) -> None:
"""Validates that:
* Each dataset models only one outcome;
* Each outcome is modeled by only one dataset;
* The target outcome is included in the datasets;
* The datasets do not model batched inputs;
* The task feature names of the datasets all match;
* Either all or none of the datasets specify Yvar.
"""
if any(len(ds.outcome_names) > 1 for ds in datasets):
raise UnsupportedError(
"Datasets containing more than one outcome are not supported."
)
if len(self.datasets) != len(datasets):
raise UnsupportedError(
"Received multiple datasets for the same outcome. Each dataset "
"must contain data for a unique outcome. Got datasets with "
f"outcome names: {(ds.outcome_names for ds in datasets)}."
)
if self.target_outcome_name not in self.datasets:
raise InputDataError(
"Target outcome is not present in the datasets. "
f"Got {self.target_outcome_name=} and datasets for "
f"outcomes {list(self.datasets.keys())}."
)
if any(len(ds.X.shape) > 2 for ds in datasets):
raise UnsupportedError(
"Datasets modeling batched inputs are not supported."
)
if self.task_feature_index is not None:
tf_names = [ds.feature_names[self.task_feature_index] for ds in datasets]
if any(name != tf_names[0] for name in tf_names[1:]):
raise InputDataError(
"Expected the names of the task features to match across all "
f"datasets. Got {tf_names}."
)
all_Yvars = [ds.Yvar for ds in datasets]
is_none = [yvar is None for yvar in all_Yvars]
# Check that either all or None of the Yvars exist.
if not all(is_none) and any(is_none):
raise UnsupportedError(
"Expected either all or none of the datasets to have a Yvar. "
"Only subset of datasets define Yvar, which is unsupported."
)
@property
def X(self) -> Tensor:
"""Appends task features, if needed, and concatenates the Xs of datasets to
produce the `train_X` expected by `MultiTaskGP` and subclasses.
If appending the task features, 0 is reserved for the target task and the
remaining tasks are populated with 1, 2, ..., len(datasets) - 1.
"""
if self.has_heterogeneous_features:
raise UnsupportedError(
"Concatenating `X`s from datasets with heterogeneous feature sets "
"is not supported."
)
all_Xs = []
next_task = 1
for outcome, ds in self.datasets.items():
if self.task_feature_index is None:
# Append the task feature index.
if outcome == self.target_outcome_name:
task_feature = 0
else:
task_feature = next_task
next_task = next_task + 1
all_Xs.append(torch.nn.functional.pad(ds.X, (0, 1), value=task_feature))
else:
all_Xs.append(ds.X)
return torch.cat(all_Xs, dim=0)
@property
def Y(self) -> Tensor:
"""Concatenates Ys of the datasets."""
return torch.cat([ds.Y for ds in self.datasets.values()], dim=0)
@property
def Yvar(self) -> Tensor | None:
"""Concatenates Yvars of the datasets if they exist."""
all_Yvars = [ds.Yvar for ds in self.datasets.values()]
return None if all_Yvars[0] is None else torch.cat(all_Yvars, dim=0)
[docs]
def get_dataset_without_task_feature(self, outcome_name: str) -> SupervisedDataset:
"""A helper for extracting the child datasets with their task features removed.
If the task feature index is `None`, the dataset will be returned as is.
Args:
outcome_name: The outcome name for the dataset to extract.
Returns:
The dataset without the task feature.
"""
dataset = self.datasets[outcome_name]
if self.task_feature_index is None:
return dataset
indices = list(range(len(self.feature_names)))
indices.pop(self.task_feature_index)
return SupervisedDataset(
X=dataset.X[..., indices],
Y=dataset.Y,
Yvar=dataset.Yvar,
feature_names=[
fn for i, fn in enumerate(dataset.feature_names) if i in indices
],
outcome_names=[outcome_name],
)
def __eq__(self, other: Any) -> bool:
return (
type(other) is type(self)
and self.datasets == other.datasets
and self.target_outcome_name == other.target_outcome_name
and self.task_feature_index == other.task_feature_index
)
[docs]
def clone(
self, deepcopy: bool = False, mask: Tensor | None = None
) -> MultiTaskDataset:
"""Return a copy of the dataset.
Args:
deepcopy: If True, perform a deep copy. Otherwise, use the same
tensors/lists/datasets.
mask: A `n`-dim boolean mask indicating which rows to keep from the target
dataset. This is used along the -2 dimension.
Returns:
The new dataset.
"""
datasets = list(self.datasets.values())
if mask is not None or deepcopy:
new_datasets = []
for outcome, ds in self.datasets.items():
new_datasets.append(
ds.clone(
deepcopy=deepcopy,
mask=mask if outcome == self.target_outcome_name else None,
)
)
datasets = new_datasets
return MultiTaskDataset(
datasets=datasets,
target_outcome_name=self.target_outcome_name,
task_feature_index=self.task_feature_index,
)
[docs]
class ContextualDataset(SupervisedDataset):
"""This is a contextual dataset that is constructed from either a single
dateset containing overall outcome or a list of datasets that each corresponds
to a context breakdown.
"""
def __init__(
self,
datasets: list[SupervisedDataset],
parameter_decomposition: dict[str, list[str]],
metric_decomposition: dict[str, list[str]] | None = None,
):
"""Construct a `ContextualDataset`.
Args:
datasets: A list of the datasets of individual tasks. Each dataset
is expected to contain data for only one outcome.
parameter_decomposition: Dict from context name to list of feature
names corresponding to that context.
metric_decomposition: Context breakdown metrics. Keys are context names.
Values are the lists of metric names belonging to the context:
{'context1': ['m1_c1'], 'context2': ['m1_c2'],}.
"""
self.datasets: dict[str, SupervisedDataset] = {
ds.outcome_names[0]: ds for ds in datasets
}
self.feature_names = datasets[0].feature_names
self.outcome_names = list(self.datasets.keys())
self.parameter_decomposition = parameter_decomposition
self.metric_decomposition = metric_decomposition
self._validate_datasets()
self._validate_decompositions()
self.context_buckets = self._extract_context_buckets()
self.parameter_index_decomp = {
c: [self.feature_names.index(i) for i in parameter_decomposition[c]]
for c in self.context_buckets
}
self.group_indices = None
@property
def X(self) -> Tensor:
return self.datasets[self.outcome_names[0]].X
@property
def Y(self) -> Tensor:
"""Concatenates the Ys from the child datasets to create the Y expected
by LCEM model if there are multiple datasets; Or return the Y expected
by LCEA model if there is only one dataset.
"""
Ys = [ds.Y for ds in self.datasets.values()]
if len(Ys) == 1:
return Ys[0]
else:
return torch.cat(Ys, dim=-1)
@property
def Yvar(self) -> Tensor | None:
"""Concatenates the Yvars from the child datasets to create the Y expected
by LCEM model if there are multiple datasets; Or return the Yvar expected
by LCEA model if there is only one dataset.
"""
Yvars = [ds.Yvar for ds in self.datasets.values()]
if Yvars[0] is None:
return None
elif len(Yvars) == 1:
return Yvars[0]
else:
return torch.cat(Yvars, dim=-1)
def _extract_context_buckets(self) -> list[str]:
"""Determines the context buckets from the data, and sets the
context_buckets attribute.
If we have an outcome for each context, we will lists the contexts
in the same order as the outcomes (i.e., the order of datasets).
If there is a single outcome (aggregated across contexts), the context
buckets are taken from the parameter decomposition.
"""
if len(self.outcome_names) > 1:
assert len(self.outcome_names) == len(
self.metric_decomposition
), "Expected a single dataset, or one for each context bucket."
context_buckets = []
for outcome_name in self.outcome_names:
for k, v in self.metric_decomposition.items():
if outcome_name in v:
context_buckets.append(k)
break
else:
context_buckets = list(self.parameter_decomposition.keys())
return context_buckets
def _validate_datasets(self) -> None:
"""Validation of given datasets.
1. each dataset has same X.
2. metric_decomposition is not None if there are multiple datasets.
3. metric_decomposition contains all the outcomes in datasets.
4. value keys of parameter decomposition and the keys of
metric_decomposition match context buckets.
5. Yvar is None for all, or not for all.
"""
datasets = list(self.datasets.values())
X = datasets[0].X
Yvar_is_none = datasets[0].Yvar is None
for dataset in datasets:
if torch.equal(X, dataset.X) is not True:
raise InputDataError("Require same X for context buckets")
if (dataset.Yvar is None) != Yvar_is_none:
raise InputDataError(
"Require Yvar to be specified for all buckets, or for none"
)
if len(datasets) > 1:
if self.metric_decomposition is None:
raise InputDataError(
"metric_decomposition must be provided when there are"
+ " multiple datasets."
)
else:
if self.metric_decomposition is not None:
raise InputDataError(
"metric_decomposition is redundant when there is one "
+ "dataset for overall outcome."
)
def _validate_decompositions(self) -> None:
"""Checks that the decompositions are valid.
Raises:
InputDataError: If any of the decompositions are invalid.
"""
if self.metric_decomposition is not None:
m = len(list(self.metric_decomposition.values())[0])
existing_metrics = set()
for v in self.metric_decomposition.values():
if existing_metrics.intersection(list(v)):
raise InputDataError(
"metric_decomposition has same metric for multiple contexts."
)
if len(v) != m or len(set(v)) != m:
raise InputDataError(
"All values in metric_decomposition must have the same length."
)
existing_metrics.update(list(v))
if set(self.metric_decomposition.keys()) != set(
self.parameter_decomposition.keys()
):
raise InputDataError(
"Keys of metric and parameter decompositions do not match."
)
all_metrics = []
for m in self.metric_decomposition.values():
all_metrics.extend(m)
for outcome in self.outcome_names:
if outcome not in all_metrics:
raise InputDataError(
f"{outcome} is missing in metric_decomposition."
)
[docs]
def clone(
self, deepcopy: bool = False, mask: Tensor | None = None
) -> ContextualDataset:
"""Return a copy of the dataset.
Args:
deepcopy: If True, perform a deep copy. Otherwise, use the same
tensors/lists/datasets.
mask: A `n`-dim boolean mask indicating which rows to keep. This is used
along the -2 dimension. `n` here corresponds to the number of rows in
an individual dataset.
Returns:
The new dataset.
"""
datasets = list(self.datasets.values())
if mask is not None or deepcopy:
datasets = [ds.clone(deepcopy=deepcopy, mask=mask) for ds in datasets]
if deepcopy:
parameter_decomposition = copy.deepcopy(self.parameter_decomposition)
metric_decomposition = copy.deepcopy(self.metric_decomposition)
else:
parameter_decomposition = self.parameter_decomposition
metric_decomposition = self.metric_decomposition
return ContextualDataset(
datasets=datasets,
parameter_decomposition=parameter_decomposition,
metric_decomposition=metric_decomposition,
)