#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
from abc import abstractmethod
from typing import List, Optional
import torch
from botorch.acquisition.objective import (
AcquisitionObjective,
GenericMCObjective,
MCAcquisitionObjective,
)
from botorch.exceptions.errors import BotorchError, BotorchTensorDimensionError
from botorch.models.model import Model
from botorch.models.transforms.outcome import Standardize
from botorch.posteriors import GPyTorchPosterior
from botorch.utils import apply_constraints
from botorch.utils.transforms import normalize_indices
from torch import Tensor
[docs]class MCMultiOutputObjective(MCAcquisitionObjective):
r"""Abstract base class for MC multi-output objectives.
Args:
_is_mo: A boolean denoting whether the objectives are multi-output.
"""
_is_mo: bool = True
[docs] @abstractmethod
def forward(self, samples: Tensor, X: Optional[Tensor] = None, **kwargs) -> Tensor:
r"""Evaluate the multi-output objective on the samples.
Args:
samples: A `sample_shape x batch_shape x q x m`-dim Tensors of samples from
a model posterior.
X: A `batch_shape x q x d`-dim Tensors of inputs.
Returns:
A `sample_shape x batch_shape x q x m'`-dim Tensor of objective values with
`m'` the output dimension. This assumes maximization in each output
dimension).
This method is usually not called directly, but via the objectives
Example:
>>> # `__call__` method:
>>> samples = sampler(posterior)
>>> outcomes = multi_obj(samples)
"""
pass # pragma: no cover
[docs]class GenericMCMultiOutputObjective(GenericMCObjective, MCMultiOutputObjective):
r"""Multi-output objective generated from a generic callable.
Allows to construct arbitrary MC-objective functions from a generic
callable. In order to be able to use gradient-based acquisition function
optimization it should be possible to backpropagate through the callable.
"""
pass
[docs]class IdentityMCMultiOutputObjective(MCMultiOutputObjective):
r"""Trivial objective that returns the unaltered samples.
Example:
>>> identity_objective = IdentityMCMultiOutputObjective()
>>> samples = sampler(posterior)
>>> objective = identity_objective(samples)
"""
def __init__(
self, outcomes: Optional[List[int]] = None, num_outcomes: Optional[int] = None
) -> None:
r"""Initialize Objective.
Args:
weights: `m'`-dim tensor of outcome weights.
outcomes: A list of the `m'` indices that the weights should be
applied to.
num_outcomes: The total number of outcomes `m`
"""
super().__init__()
if outcomes is not None:
if len(outcomes) < 2:
raise BotorchTensorDimensionError(
"Must specify at least two outcomes for MOO."
)
if any(i < 0 for i in outcomes):
if num_outcomes is None:
raise BotorchError(
"num_outcomes is required if any outcomes are less than 0."
)
outcomes = normalize_indices(outcomes, num_outcomes)
self.register_buffer("outcomes", torch.tensor(outcomes, dtype=torch.long))
[docs] def forward(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
if hasattr(self, "outcomes"):
return samples.index_select(-1, self.outcomes.to(device=samples.device))
return samples
[docs]class WeightedMCMultiOutputObjective(IdentityMCMultiOutputObjective):
r"""Objective that reweights samples by given weights vector.
Example:
>>> weights = torch.tensor([1.0, -1.0])
>>> weighted_objective = WeightedMCMultiOutputObjective(weights)
>>> samples = sampler(posterior)
>>> objective = weighted_objective(samples)
"""
def __init__(
self,
weights: Tensor,
outcomes: Optional[List[int]] = None,
num_outcomes: Optional[int] = None,
) -> None:
r"""Initialize Objective.
Args:
weights: `m'`-dim tensor of outcome weights.
outcomes: A list of the `m'` indices that the weights should be
applied to.
num_outcomes: the total number of outcomes `m`
"""
super().__init__(outcomes=outcomes, num_outcomes=num_outcomes)
if weights.ndim != 1:
raise BotorchTensorDimensionError(
f"weights must be an 1-D tensor, but got {weights.shape}."
)
elif outcomes is not None and weights.shape[0] != len(outcomes):
raise BotorchTensorDimensionError(
"weights must contain the same number of elements as outcomes, "
f"but got {weights.numel()} weights and {len(outcomes)} outcomes."
)
self.register_buffer("weights", weights)
[docs] def forward(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
samples = super().forward(samples=samples)
return samples * self.weights.to(samples)
[docs]class FeasibilityWeightedMCMultiOutputObjective(MCMultiOutputObjective):
def __init__(
self,
model: Model,
X_baseline: Tensor,
constraint_idcs: List[int],
objective: Optional[MCMultiOutputObjective] = None,
) -> None:
r"""Construct a feasibility weighted objective.
This applies feasibility weighting before calculating the objective value.
Defaults to identity if no constraints or objective is present.
NOTE: By passing in a single-output `MCAcquisitionObjective` as the `objective`,
this can be used as a single-output `MCAcquisitionObjective` as well.
Args:
model: A fitted Model.
X_baseline: An `n x d`-dim tensor of points already observed.
constraint_idcs: The outcome indices of the constraints. Constraints are
handled by weighting the samples according to a sigmoid approximation
of feasibility. A positive constraint outcome implies feasibility.
objective: An optional objective to apply after feasibility-weighting
the samples.
"""
super().__init__()
num_outputs = model.num_outputs
# Get the non-negative indices.
constraint_idcs = [
num_outputs + idx if idx < 0 else idx for idx in constraint_idcs
]
if len(constraint_idcs) != len(set(constraint_idcs)):
raise ValueError("Received duplicate entries for `constraint_idcs`.")
# Extract the indices for objective outcomes.
objective_idcs = [i for i in range(num_outputs) if i not in constraint_idcs]
if len(constraint_idcs) > 0:
# Import locally to avoid circular import.
from botorch.acquisition.utils import get_infeasible_cost
inf_cost = get_infeasible_cost(
X=X_baseline, model=model, objective=lambda y: y
)[objective_idcs]
def apply_feasibility_weights(
Y: Tensor, X: Optional[Tensor] = None
) -> Tensor:
return apply_constraints(
obj=Y[..., objective_idcs],
constraints=[lambda Y: -Y[..., i] for i in constraint_idcs],
samples=Y,
# This ensures that the dtype/device is set properly.
infeasible_cost=inf_cost.to(Y),
)
self.apply_feasibility_weights = apply_feasibility_weights
else:
self.apply_feasibility_weights = lambda Y: Y
if objective is None:
self.objective = lambda Y, X: Y
else:
self.objective = objective
self._verify_output_shape = objective._verify_output_shape
[docs] def forward(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
return self.objective(self.apply_feasibility_weights(samples), X=X)
[docs]class UnstandardizeMCMultiOutputObjective(IdentityMCMultiOutputObjective):
r"""Objective that unstandardizes the samples.
TODO: remove this when MultiTask models support outcome transforms.
Example:
>>> unstd_objective = UnstandardizeMCMultiOutputObjective(Y_mean, Y_std)
>>> samples = sampler(posterior)
>>> objective = unstd_objective(samples)
"""
def __init__(
self, Y_mean: Tensor, Y_std: Tensor, outcomes: Optional[List[int]] = None
) -> None:
r"""Initialize objective.
Args:
Y_mean: `m`-dim tensor of outcome means.
Y_std: `m`-dim tensor of outcome standard deviations.
outcomes: A list of `m' <= m` indices that specifies which of the `m` model
outputs should be considered as the outcomes for MOO. If omitted, use
all model outcomes. Typically used for constrained optimization.
"""
if Y_mean.ndim > 1 or Y_std.ndim > 1:
raise BotorchTensorDimensionError(
"Y_mean and Y_std must both be 1-dimensional, but got "
f"{Y_mean.ndim} and {Y_std.ndim}"
)
elif outcomes is not None and len(outcomes) > Y_mean.shape[-1]:
raise BotorchTensorDimensionError(
f"Cannot specify more ({len(outcomes)}) outcomes than are present in "
f"the normalization inputs ({Y_mean.shape[-1]})."
)
super().__init__(outcomes=outcomes, num_outcomes=Y_mean.shape[-1])
if outcomes is not None:
Y_mean = Y_mean.index_select(-1, self.outcomes.to(Y_mean.device))
Y_std = Y_std.index_select(-1, self.outcomes.to(Y_mean.device))
self.register_buffer("Y_mean", Y_mean)
self.register_buffer("Y_std", Y_std)
[docs] def forward(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
samples = super().forward(samples=samples)
return samples * self.Y_std + self.Y_mean
[docs]class AnalyticMultiOutputObjective(AcquisitionObjective):
r"""Abstract base class for multi-output analyic objectives."""
# TODO: Refactor these as PosteriorTransform as well.
[docs] @abstractmethod
def forward(self, posterior: GPyTorchPosterior) -> GPyTorchPosterior:
r"""Transform the posterior
Args:
posterior: A posterior.
Returns:
A transformed posterior.
"""
pass # pragma: no cover
[docs]class IdentityAnalyticMultiOutputObjective(AnalyticMultiOutputObjective):
[docs] def forward(self, posterior: GPyTorchPosterior) -> GPyTorchPosterior:
return posterior
[docs]class UnstandardizeAnalyticMultiOutputObjective(AnalyticMultiOutputObjective):
r"""Objective that unstandardizes the posterior.
TODO: remove this when MultiTask models support outcome transforms.
Example:
>>> unstd_objective = UnstandardizeAnalyticMultiOutputObjective(Y_mean, Y_std)
>>> unstd_posterior = unstd_objective(posterior)
"""
def __init__(self, Y_mean: Tensor, Y_std: Tensor) -> None:
r"""Initialize objective.
Args:
Y_mean: `m`-dim tensor of outcome means
Y_std: `m`-dim tensor of outcome standard deviations
"""
if Y_mean.ndim > 1 or Y_std.ndim > 1:
raise BotorchTensorDimensionError(
"Y_mean and Y_std must both be 1-dimensional, but got "
f"{Y_mean.ndim} and {Y_std.ndim}"
)
super().__init__()
self.outcome_transform = Standardize(m=Y_mean.shape[0]).to(Y_mean)
Y_std_unsqueezed = Y_std.unsqueeze(0)
self.outcome_transform.means = Y_mean.unsqueeze(0)
self.outcome_transform.stdvs = Y_std_unsqueezed
self.outcome_transform._stdvs_sq = Y_std_unsqueezed.pow(2)
self.outcome_transform.eval()
[docs] def forward(self, posterior: GPyTorchPosterior) -> Tensor:
return self.outcome_transform.untransform_posterior(posterior)