#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""Parsing rules for BoTorch datasets."""
from __future__ import annotations
from typing import Any, Dict, Hashable, Type, Union
import torch
from botorch.exceptions import UnsupportedError
from botorch.models.model import Model
from botorch.models.multitask import MultiTaskGP
from botorch.models.pairwise_gp import PairwiseGP
from botorch.utils.datasets import RankingDataset, SupervisedDataset
from botorch.utils.dispatcher import Dispatcher
from torch import cat, Tensor
from torch.nn.functional import pad
def _encoder(arg: Any) -> Type:
# Allow type variables to be passed as arguments at runtime
return arg if isinstance(arg, type) else type(arg)
dispatcher = Dispatcher("parse_training_data", encoder=_encoder)
[docs]def parse_training_data(
consumer: Any,
training_data: Union[SupervisedDataset, Dict[Hashable, SupervisedDataset]],
**kwargs: Any,
) -> Dict[str, Tensor]:
r"""Prepares a (collection of) datasets for consumption by a given object.
Args:
training_datas: A SupervisedDataset or dictionary thereof.
consumer: The object that will consume the parsed data, or type thereof.
Returns:
A dictionary containing the extracted information.
"""
return dispatcher(consumer, training_data, **kwargs)
@dispatcher.register(Model, SupervisedDataset)
def _parse_model_supervised(
consumer: Model, dataset: SupervisedDataset, **ignore: Any
) -> Dict[str, Tensor]:
parsed_data = {"train_X": dataset.X, "train_Y": dataset.Y}
if dataset.Yvar is not None:
parsed_data["train_Yvar"] = dataset.Yvar
return parsed_data
@dispatcher.register(PairwiseGP, RankingDataset)
def _parse_pairwiseGP_ranking(
consumer: PairwiseGP, dataset: RankingDataset, **ignore: Any
) -> Dict[str, Tensor]:
# TODO: [T163045056] Not sure what the point of the special container is if we have
# to further process it here. We should move this logic into RankingDataset.
datapoints = dataset._X.values
comparisons = dataset._X.indices
comp_order = dataset.Y
comparisons = torch.gather(input=comparisons, dim=-1, index=comp_order)
return {
"datapoints": datapoints,
"comparisons": comparisons,
}
@dispatcher.register(Model, dict)
def _parse_model_dict(
consumer: Model,
training_data: Dict[Hashable, SupervisedDataset],
**kwargs: Any,
) -> Dict[str, Tensor]:
if len(training_data) != 1:
raise UnsupportedError(
"Default training data parsing logic does not support "
"passing multiple datasets to single task models."
)
return dispatcher(consumer, next(iter(training_data.values())))
@dispatcher.register(MultiTaskGP, dict)
def _parse_multitask_dict(
consumer: Model,
training_data: Dict[Hashable, SupervisedDataset],
*,
task_feature: int = 0,
task_feature_container: Hashable = "train_X",
**kwargs: Any,
) -> Dict[str, Tensor]:
cache = {}
for task_id, dataset in enumerate(training_data.values()):
parse = parse_training_data(consumer, dataset, **kwargs)
if task_feature_container not in parse:
raise ValueError(f"Missing required term `{task_feature_container}`.")
if cache and cache.keys() != parse.keys():
raise UnsupportedError(
"Cannot combine datasets with heterogeneous parsed formats."
)
# Add task indicator features to specified container
X = parse[task_feature_container]
d = X.shape[-1]
i = d + task_feature + 1 if task_feature < 0 else task_feature
if i < 0 or d < i:
raise ValueError("Invalid `task_feature`: out-of-bounds.")
if i == 0:
X = pad(X, (1, 0), value=task_id)
elif i == d:
X = pad(X, (0, 1), value=task_id)
else:
A, B = X.split([i, d - i], dim=-1)
X = cat([pad(A, (0, 1), value=task_id), B], dim=-1)
parse[task_feature_container] = X
if cache:
for key, val in parse.items():
cache[key].append(val)
else:
cache = {key: [val] for key, val in parse.items()}
return {key: cat(tensors, dim=0) for key, tensors in cache.items()}