#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
from abc import abstractmethod
from typing import List, Optional
import torch
from botorch.acquisition.objective import GenericMCObjective, MCAcquisitionObjective
from botorch.exceptions.errors import BotorchError, BotorchTensorDimensionError
from botorch.models.model import Model
from botorch.utils import apply_constraints
from botorch.utils.transforms import normalize_indices
from torch import Tensor
[docs]
class MCMultiOutputObjective(MCAcquisitionObjective):
r"""Abstract base class for MC multi-output objectives.
Args:
_is_mo: A boolean denoting whether the objectives are multi-output.
"""
_is_mo: bool = True
[docs]
@abstractmethod
def forward(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
r"""Evaluate the multi-output objective on the samples.
Args:
samples: A `sample_shape x batch_shape x q x m`-dim Tensors of samples from
a model posterior.
X: A `batch_shape x q x d`-dim Tensors of inputs.
Returns:
A `sample_shape x batch_shape x q x m'`-dim Tensor of objective values with
`m'` the output dimension. This assumes maximization in each output
dimension).
This method is usually not called directly, but via the objectives.
Example:
>>> # `__call__` method:
>>> samples = sampler(posterior)
>>> outcomes = multi_obj(samples)
"""
pass # pragma: no cover
[docs]
class GenericMCMultiOutputObjective(GenericMCObjective, MCMultiOutputObjective):
r"""Multi-output objective generated from a generic callable.
Allows to construct arbitrary MC-objective functions from a generic
callable. In order to be able to use gradient-based acquisition function
optimization it should be possible to backpropagate through the callable.
"""
pass
[docs]
class IdentityMCMultiOutputObjective(MCMultiOutputObjective):
r"""Trivial objective that returns the unaltered samples.
Example:
>>> identity_objective = IdentityMCMultiOutputObjective()
>>> samples = sampler(posterior)
>>> objective = identity_objective(samples)
"""
def __init__(
self, outcomes: Optional[List[int]] = None, num_outcomes: Optional[int] = None
) -> None:
r"""Initialize Objective.
Args:
outcomes: A list of the `m'` indices that the weights should be
applied to.
num_outcomes: The total number of outcomes `m`
"""
super().__init__()
if outcomes is not None:
if len(outcomes) < 2:
raise BotorchTensorDimensionError(
"Must specify at least two outcomes for MOO."
)
if any(i < 0 for i in outcomes):
if num_outcomes is None:
raise BotorchError(
"num_outcomes is required if any outcomes are less than 0."
)
outcomes = normalize_indices(outcomes, num_outcomes)
self.register_buffer("outcomes", torch.tensor(outcomes, dtype=torch.long))
[docs]
def forward(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
if hasattr(self, "outcomes"):
return samples.index_select(-1, self.outcomes.to(device=samples.device))
return samples
[docs]
class WeightedMCMultiOutputObjective(IdentityMCMultiOutputObjective):
r"""Objective that reweights samples by given weights vector.
Example:
>>> weights = torch.tensor([1.0, -1.0])
>>> weighted_objective = WeightedMCMultiOutputObjective(weights)
>>> samples = sampler(posterior)
>>> objective = weighted_objective(samples)
"""
def __init__(
self,
weights: Tensor,
outcomes: Optional[List[int]] = None,
num_outcomes: Optional[int] = None,
) -> None:
r"""Initialize Objective.
Args:
weights: `m'`-dim tensor of outcome weights.
outcomes: A list of the `m'` indices that the weights should be
applied to.
num_outcomes: the total number of outcomes `m`
"""
super().__init__(outcomes=outcomes, num_outcomes=num_outcomes)
if weights.ndim != 1:
raise BotorchTensorDimensionError(
f"weights must be an 1-D tensor, but got {weights.shape}."
)
elif outcomes is not None and weights.shape[0] != len(outcomes):
raise BotorchTensorDimensionError(
"weights must contain the same number of elements as outcomes, "
f"but got {weights.numel()} weights and {len(outcomes)} outcomes."
)
self.register_buffer("weights", weights)
[docs]
def forward(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
samples = super().forward(samples=samples)
return samples * self.weights.to(samples)
[docs]
class FeasibilityWeightedMCMultiOutputObjective(MCMultiOutputObjective):
def __init__(
self,
model: Model,
X_baseline: Tensor,
constraint_idcs: List[int],
objective: Optional[MCMultiOutputObjective] = None,
) -> None:
r"""Construct a feasibility-weighted objective.
This applies feasibility weighting before calculating the objective value.
Defaults to identity if no constraints or objective is present.
NOTE: By passing in a single-output `MCAcquisitionObjective` as the `objective`,
this can be used as a single-output `MCAcquisitionObjective` as well.
Args:
model: A fitted Model.
X_baseline: An `n x d`-dim tensor of points already observed.
constraint_idcs: The outcome indices of the constraints. Constraints are
handled by weighting the samples according to a sigmoid approximation
of feasibility. A positive constraint outcome implies feasibility.
objective: An optional objective to apply after feasibility-weighting
the samples.
"""
super().__init__()
num_outputs = model.num_outputs
# Get the non-negative indices.
constraint_idcs = [
num_outputs + idx if idx < 0 else idx for idx in constraint_idcs
]
if len(constraint_idcs) != len(set(constraint_idcs)):
raise ValueError("Received duplicate entries for `constraint_idcs`.")
# Extract the indices for objective outcomes.
objective_idcs = [i for i in range(num_outputs) if i not in constraint_idcs]
if len(constraint_idcs) > 0:
# Import locally to avoid circular import.
from botorch.acquisition.utils import get_infeasible_cost
inf_cost = get_infeasible_cost(
X=X_baseline, model=model, objective=lambda y, X: y
)[objective_idcs]
def apply_feasibility_weights(
Y: Tensor, X: Optional[Tensor] = None
) -> Tensor:
return apply_constraints(
obj=Y[..., objective_idcs],
constraints=[lambda Y: -Y[..., i] for i in constraint_idcs],
samples=Y,
# This ensures that the dtype/device is set properly.
infeasible_cost=inf_cost.to(Y),
)
self.apply_feasibility_weights = apply_feasibility_weights
else:
self.apply_feasibility_weights = lambda Y: Y
if objective is None:
self.objective = lambda Y, X: Y
else:
self.objective = objective
self._verify_output_shape = objective._verify_output_shape
[docs]
def forward(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
return self.objective(self.apply_feasibility_weights(samples), X=X)