from __future__ import annotations

from abc import abstractmethod

import torch
from botorch.acquisition.objective import GenericMCObjective, MCAcquisitionObjective
from botorch.exceptions.errors import BotorchError, BotorchTensorDimensionError
from botorch.models.model import Model
from botorch.utils import apply_constraints
from botorch.utils.transforms import normalize_indices
from torch import Tensor

[docs] class MCMultiOutputObjective(MCAcquisitionObjective): r"""Abstract base class for MC multi-output objectives. Args: _is_mo: A boolean denoting whether the objectives are multi-output. """ _is_mo: bool = True
[docs] @abstractmethod def forward(self, samples: Tensor, X: Tensor | None = None) -> Tensor: r"""Evaluate the multi-output objective on the samples. Args: samples: A `sample_shape x batch_shape x q x m`-dim Tensors of samples from a model posterior. X: A `batch_shape x q x d`-dim Tensors of inputs. Returns: A `sample_shape x batch_shape x q x m'`-dim Tensor of objective values with `m'` the output dimension. This assumes maximization in each output dimension). This method is usually not called directly, but via the objectives. Example: >>> # `__call__` method: >>> samples = sampler(posterior) >>> outcomes = multi_obj(samples) """ pass # pragma: no cover
[docs] class GenericMCMultiOutputObjective(GenericMCObjective, MCMultiOutputObjective): r"""Multi-output objective generated from a generic callable. Allows to construct arbitrary MC-objective functions from a generic callable. In order to be able to use gradient-based acquisition function optimization it should be possible to backpropagate through the callable. """ pass
[docs] class IdentityMCMultiOutputObjective(MCMultiOutputObjective): r"""Trivial objective that returns the unaltered samples. Example: >>> identity_objective = IdentityMCMultiOutputObjective() >>> samples = sampler(posterior) >>> objective = identity_objective(samples) """ def __init__( self, outcomes: list[int] | None = None, num_outcomes: int | None = None ) -> None: r"""Initialize Objective. Args: outcomes: A list of the `m'` indices that the weights should be applied to. num_outcomes: The total number of outcomes `m` """ super().__init__() if outcomes is not None: if len(outcomes) < 2: raise BotorchTensorDimensionError( "Must specify at least two outcomes for MOO." ) if any(i < 0 for i in outcomes): if num_outcomes is None: raise BotorchError( "num_outcomes is required if any outcomes are less than 0." ) outcomes = normalize_indices(outcomes, num_outcomes) self.register_buffer("outcomes", torch.tensor(outcomes, dtype=torch.long))
[docs] def forward(self, samples: Tensor, X: Tensor | None = None) -> Tensor: if hasattr(self, "outcomes"): return samples.index_select(-1, return samples
[docs] class WeightedMCMultiOutputObjective(IdentityMCMultiOutputObjective): r"""Objective that reweights samples by given weights vector. Example: >>> weights = torch.tensor([1.0, -1.0]) >>> weighted_objective = WeightedMCMultiOutputObjective(weights) >>> samples = sampler(posterior) >>> objective = weighted_objective(samples) """ def __init__( self, weights: Tensor, outcomes: list[int] | None = None, num_outcomes: int | None = None, ) -> None: r"""Initialize Objective. Args: weights: `m'`-dim tensor of outcome weights. outcomes: A list of the `m'` indices that the weights should be applied to. num_outcomes: the total number of outcomes `m` """ super().__init__(outcomes=outcomes, num_outcomes=num_outcomes) if weights.ndim != 1: raise BotorchTensorDimensionError( f"weights must be an 1-D tensor, but got {weights.shape}." ) elif outcomes is not None and weights.shape[0] != len(outcomes): raise BotorchTensorDimensionError( "weights must contain the same number of elements as outcomes, " f"but got {weights.numel()} weights and {len(outcomes)} outcomes." ) self.register_buffer("weights", weights)
[docs] def forward(self, samples: Tensor, X: Tensor | None = None) -> Tensor: samples = super().forward(samples=samples) return samples *
[docs] class FeasibilityWeightedMCMultiOutputObjective(MCMultiOutputObjective): def __init__( self, model: Model, X_baseline: Tensor, constraint_idcs: list[int], objective: MCMultiOutputObjective | None = None, ) -> None: r"""Construct a feasibility-weighted objective. This applies feasibility weighting before calculating the objective value. Defaults to identity if no constraints or objective is present. NOTE: By passing in a single-output `MCAcquisitionObjective` as the `objective`, this can be used as a single-output `MCAcquisitionObjective` as well. Args: model: A fitted Model. X_baseline: An `n x d`-dim tensor of points already observed. constraint_idcs: The outcome indices of the constraints. Constraints are handled by weighting the samples according to a sigmoid approximation of feasibility. A positive constraint outcome implies feasibility. objective: An optional objective to apply after feasibility-weighting the samples. """ super().__init__() num_outputs = model.num_outputs # Get the non-negative indices. constraint_idcs = [ num_outputs + idx if idx < 0 else idx for idx in constraint_idcs ] if len(constraint_idcs) != len(set(constraint_idcs)): raise ValueError("Received duplicate entries for `constraint_idcs`.") # Extract the indices for objective outcomes. objective_idcs = [i for i in range(num_outputs) if i not in constraint_idcs] if len(constraint_idcs) > 0: # Import locally to avoid circular import. from botorch.acquisition.utils import get_infeasible_cost inf_cost = get_infeasible_cost( X=X_baseline, model=model, objective=lambda y, X: y )[objective_idcs] def apply_feasibility_weights(Y: Tensor, X: Tensor | None = None) -> Tensor: return apply_constraints( obj=Y[..., objective_idcs], constraints=[lambda Y: -Y[..., i] for i in constraint_idcs], samples=Y, # This ensures that the dtype/device is set properly., ) self.apply_feasibility_weights = apply_feasibility_weights else: self.apply_feasibility_weights = lambda Y: Y if objective is None: self.objective = lambda Y, X: Y else: self.objective = objective self._verify_output_shape = objective._verify_output_shape
[docs] def forward(self, samples: Tensor, X: Tensor | None = None) -> Tensor: return self.objective(self.apply_feasibility_weights(samples), X=X)