#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Monte-Carlo Acquisition Functions for Multi-objective Bayesian optimization.
References
.. [Daulton2020]
S. Daulton, M. Balandat, E. Bakshy. Differentiable Expected Hypervolume
Improvement for Parallel Multi-Objective Bayesian Optimization.
arXiv e-prints, arXiv:2006.05078, Jun. 2020.
"""
from __future__ import annotations
from abc import abstractmethod
from itertools import combinations
from typing import Callable, List, Optional
import torch
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.multi_objective.objective import (
IdentityMCMultiOutputObjective,
MCMultiOutputObjective,
)
from botorch.exceptions.errors import UnsupportedError
from botorch.models.model import Model
from botorch.sampling.samplers import MCSampler, SobolQMCNormalSampler
from botorch.utils.multi_objective.box_decomposition import NondominatedPartitioning
from botorch.utils.objective import apply_constraints_nonnegative_soft
from botorch.utils.torch import BufferDict
from botorch.utils.transforms import concatenate_pending_points, t_batch_mode_transform
from torch import Tensor
[docs]class MultiObjectiveMCAcquisitionFunction(AcquisitionFunction):
r"""Abstract base class for Multi-Objective batch acquisition functions."""
def __init__(
self,
model: Model,
sampler: Optional[MCSampler] = None,
objective: Optional[MCMultiOutputObjective] = None,
X_pending: Optional[Tensor] = None,
) -> None:
r"""Constructor for the MCAcquisitionFunction base class.
Args:
model: A fitted model.
sampler: The sampler used to draw base samples. Defaults to
`SobolQMCNormalSampler(num_samples=512, collapse_batch_dims=True)`.
objective: The MCMultiOutputObjective under which the samples are
evaluated. Defaults to `IdentityMultiOutputObjective()`.
X_pending: A `m x d`-dim Tensor of `m` design points that have
points that have been submitted for function evaluation
but have not yet been evaluated.
"""
super().__init__(model=model)
if sampler is None:
sampler = SobolQMCNormalSampler(num_samples=512, collapse_batch_dims=True)
self.add_module("sampler", sampler)
if objective is None:
objective = IdentityMCMultiOutputObjective()
elif not isinstance(objective, MCMultiOutputObjective):
raise UnsupportedError(
"Only objectives of type MCMultiOutputObjective are supported for "
"Multi-Objective MC acquisition functions."
)
self.add_module("objective", objective)
self.set_X_pending(X_pending)
[docs] @abstractmethod
def forward(self, X: Tensor) -> Tensor:
r"""Takes in a `batch_shape x q x d` X Tensor of t-batches with `q` `d`-dim
design points each, and returns a Tensor with shape `batch_shape'`, where
`batch_shape'` is the broadcasted batch shape of model and input `X`. Should
utilize the result of `set_X_pending` as needed to account for pending function
evaluations.
"""
pass # pragma: no cover
[docs]class qExpectedHypervolumeImprovement(MultiObjectiveMCAcquisitionFunction):
def __init__(
self,
model: Model,
ref_point: List[float],
partitioning: NondominatedPartitioning,
sampler: Optional[MCSampler] = None,
objective: Optional[MCMultiOutputObjective] = None,
constraints: Optional[List[Callable[[Tensor], Tensor]]] = None,
X_pending: Optional[Tensor] = None,
eta: float = 1e-3,
) -> None:
r"""q-Expected Hypervolume Improvement supporting m>=2 outcomes.
See [Daulton2020]_ for details.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> ref_point = [0.0, 0.0]
>>> qEHVI = qExpectedHypervolumeImprovement(model, ref_point, partitioning)
>>> qehvi = qEHVI(test_X)
Args:
model: A fitted model.
ref_point: A list with `m` elements representing the reference point (in the
outcome space) w.r.t. to which compute the hypervolume. This is a
reference point for the objective values (i.e. after applying
`objective` to the samples).
partitioning: A `NondominatedPartitioning` module that provides the non-
dominated front and a partitioning of the non-dominated space in hyper-
rectangles. If constraints are present, this partitioning must only
include feasible points.
sampler: The sampler used to draw base samples. Defaults to
`SobolQMCNormalSampler(num_samples=512, collapse_batch_dims=True)`.
objective: The MCMultiOutputObjective under which the samples are evaluated.
Defaults to `IdentityMultiOutputObjective()`.
constraints: A list of callables, each mapping a Tensor of dimension
`sample_shape x batch-shape x q x m` to a Tensor of dimension
`sample_shape x batch-shape x q`, where negative values imply
feasibility. The acqusition function will compute expected feasible
hypervolume.
X_pending: A `batch_shape x m x d`-dim Tensor of `m` design points that have
points that have been submitted for function evaluation but have not yet
been evaluated. Concatenated into `X` upon forward call. Copied and set
to have no gradient.
eta: The temperature parameter for the sigmoid function used for the
differentiable approximation of the constraints.
"""
if len(ref_point) != partitioning.num_outcomes:
raise ValueError(
"The length of the reference point must match the number of outcomes. "
f"Got ref_point with {len(ref_point)} elements, but expected "
f"{partitioning.num_outcomes}."
)
ref_point = torch.tensor(
ref_point,
dtype=partitioning.pareto_Y.dtype,
device=partitioning.pareto_Y.device,
)
super().__init__(
model=model, sampler=sampler, objective=objective, X_pending=X_pending
)
self.constraints = constraints
self.eta = eta
self.register_buffer("ref_point", ref_point)
cell_bounds = partitioning.get_hypercell_bounds(ref_point=self.ref_point)
self.register_buffer("cell_lower_bounds", cell_bounds[0])
self.register_buffer("cell_upper_bounds", cell_bounds[1])
self.q = -1
self.q_subset_indices = BufferDict()
def _cache_q_subset_indices(self, q: int) -> None:
r"""Cache indices corresponding to all subsets of `q`.
This means that consecutive calls to `forward` with the same
`q` will not recompute the indices for all (2^q - 1) subsets.
Note: this will use more memory than regenerating the indices
for each i and then deleting them, but it will be faster for
repeated evaluations (e.g. during optimization).
Args:
q: batch size
"""
if q != self.q:
indices = list(range(q))
tkwargs = {"dtype": torch.long, "device": self.cell_lower_bounds.device}
self.q_subset_indices = BufferDict(
{
f"q_choose_{i}": torch.tensor(
list(combinations(indices, i)), **tkwargs
)
for i in range(1, q + 1)
}
)
self.q = q
def _compute_qehvi(self, samples: Tensor) -> Tensor:
r"""Compute the expected (feasible) hypervolume improvement given MC samples.
Args:
samples: A `n_samples x batch_shape x q x m`-dim tensor of samples.
Returns:
A `batch_shape`-dim tensor of expected hypervolume improvement for each
batch.
"""
q = samples.shape[-2]
# Note that the objective may subset the outcomes (e.g. this will usually happen
# if there are constraints present).
obj = self.objective(samples)
if self.constraints is not None:
feas_weights = torch.ones(
obj.shape[:-1], device=obj.device, dtype=obj.dtype
)
feas_weights = apply_constraints_nonnegative_soft(
obj=feas_weights,
constraints=self.constraints,
samples=samples,
eta=self.eta,
)
self._cache_q_subset_indices(q=q)
batch_shape = samples.shape[:-2]
areas_per_segment = torch.zeros(
*batch_shape,
self.cell_lower_bounds.shape[-2],
dtype=obj.dtype,
device=obj.device,
)
sample_batch_view_shape = [
batch_shape[0] if self.cell_lower_bounds.ndim == 3 else 1
] + [1] * (len(batch_shape) - 1)
view_shape = (
*sample_batch_view_shape,
self.cell_upper_bounds.shape[-2],
1,
self.cell_upper_bounds.shape[-1],
)
for i in range(1, q + 1):
# TODO: we could use batches to compute (q choose i) and (q choose q-i)
# simulataneously since subsets of size i and q-i have the same number of
# elements. This would decrease the number of iterations, but increase
# memory usage.
q_choose_i = self.q_subset_indices[f"q_choose_{i}"]
# this tensor is mc_samples x batch_shape x i x q_choose_i x m
obj_subsets = obj.index_select(dim=-2, index=q_choose_i.view(-1))
obj_subsets = obj_subsets.view(
obj.shape[:-2] + q_choose_i.shape + obj.shape[-1:]
)
# since all hyperrectangles share one vertex, the opposite vertex of the
# overlap is given by the component-wise minimum.
# take the minimum in each subset
overlap_vertices = obj_subsets.min(dim=-2).values
# add batch-dim to compute area for each segment (pseudo-pareto-vertex)
# this tensor is mc_samples x batch_shape x num_cells x q_choose_i x m
overlap_vertices = torch.min(
overlap_vertices.unsqueeze(-3), self.cell_upper_bounds.view(view_shape)
)
# substract cell lower bounds, clamp min at zero
lengths_i = (
overlap_vertices - self.cell_lower_bounds.view(view_shape)
).clamp_min(0.0)
# take product over hyperrectangle side lengths to compute area
# sum over all subsets of size i
areas_i = lengths_i.prod(dim=-1)
# if constraints are present, apply a differentiable approximation of
# the indicator function
if self.constraints is not None:
feas_subsets = feas_weights.index_select(
dim=-1, index=q_choose_i.view(-1)
).view(feas_weights.shape[:-1] + q_choose_i.shape)
areas_i = areas_i * feas_subsets.unsqueeze(-3).prod(dim=-1)
areas_i = areas_i.sum(dim=-1)
# Using the inclusion-exclusion principle, set the sign to be positive
# for subsets of odd sizes and negative for subsets of even size
areas_per_segment += (-1) ** (i + 1) * areas_i
# sum over segments and average over MC samples
return areas_per_segment.sum(dim=-1).mean(dim=0)
[docs] @concatenate_pending_points
@t_batch_mode_transform()
def forward(self, X: Tensor) -> Tensor:
posterior = self.model.posterior(X)
samples = self.sampler(posterior)
return self._compute_qehvi(samples=samples)