Source code for botorch.acquisition.multi_objective.monte_carlo

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

r"""
Monte-Carlo Acquisition Functions for Multi-objective Bayesian optimization.

References

.. [Daulton2020]
    S. Daulton, M. Balandat, E. Bakshy. Differentiable Expected Hypervolume
    Improvement for Parallel Multi-Objective Bayesian Optimization.
    arXiv e-prints, arXiv:2006.05078, Jun. 2020.

"""

from __future__ import annotations

from abc import abstractmethod
from itertools import combinations
from typing import Callable, List, Optional

import torch
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.multi_objective.objective import (
    IdentityMCMultiOutputObjective,
    MCMultiOutputObjective,
)
from botorch.exceptions.errors import UnsupportedError
from botorch.models.model import Model
from botorch.sampling.samplers import MCSampler, SobolQMCNormalSampler
from botorch.utils.multi_objective.box_decomposition import NondominatedPartitioning
from botorch.utils.objective import apply_constraints_nonnegative_soft
from botorch.utils.torch import BufferDict
from botorch.utils.transforms import concatenate_pending_points, t_batch_mode_transform
from torch import Tensor


[docs]class MultiObjectiveMCAcquisitionFunction(AcquisitionFunction): r"""Abstract base class for Multi-Objective batch acquisition functions.""" def __init__( self, model: Model, sampler: Optional[MCSampler] = None, objective: Optional[MCMultiOutputObjective] = None, X_pending: Optional[Tensor] = None, ) -> None: r"""Constructor for the MCAcquisitionFunction base class. Args: model: A fitted model. sampler: The sampler used to draw base samples. Defaults to `SobolQMCNormalSampler(num_samples=512, collapse_batch_dims=True)`. objective: The MCMultiOutputObjective under which the samples are evaluated. Defaults to `IdentityMultiOutputObjective()`. X_pending: A `m x d`-dim Tensor of `m` design points that have points that have been submitted for function evaluation but have not yet been evaluated. """ super().__init__(model=model) if sampler is None: sampler = SobolQMCNormalSampler(num_samples=512, collapse_batch_dims=True) self.add_module("sampler", sampler) if objective is None: objective = IdentityMCMultiOutputObjective() elif not isinstance(objective, MCMultiOutputObjective): raise UnsupportedError( "Only objectives of type MCMultiOutputObjective are supported for " "Multi-Objective MC acquisition functions." ) self.add_module("objective", objective) self.set_X_pending(X_pending)
[docs] @abstractmethod def forward(self, X: Tensor) -> Tensor: r"""Takes in a `batch_shape x q x d` X Tensor of t-batches with `q` `d`-dim design points each, and returns a Tensor with shape `batch_shape'`, where `batch_shape'` is the broadcasted batch shape of model and input `X`. Should utilize the result of `set_X_pending` as needed to account for pending function evaluations. """ pass # pragma: no cover
[docs]class qExpectedHypervolumeImprovement(MultiObjectiveMCAcquisitionFunction): def __init__( self, model: Model, ref_point: List[float], partitioning: NondominatedPartitioning, sampler: Optional[MCSampler] = None, objective: Optional[MCMultiOutputObjective] = None, constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, X_pending: Optional[Tensor] = None, eta: float = 1e-3, ) -> None: r"""q-Expected Hypervolume Improvement supporting m>=2 outcomes. See [Daulton2020]_ for details. Example: >>> model = SingleTaskGP(train_X, train_Y) >>> ref_point = [0.0, 0.0] >>> qEHVI = qExpectedHypervolumeImprovement(model, ref_point, partitioning) >>> qehvi = qEHVI(test_X) Args: model: A fitted model. ref_point: A list with `m` elements representing the reference point (in the outcome space) w.r.t. to which compute the hypervolume. This is a reference point for the objective values (i.e. after applying `objective` to the samples). partitioning: A `NondominatedPartitioning` module that provides the non- dominated front and a partitioning of the non-dominated space in hyper- rectangles. If constraints are present, this partitioning must only include feasible points. sampler: The sampler used to draw base samples. Defaults to `SobolQMCNormalSampler(num_samples=512, collapse_batch_dims=True)`. objective: The MCMultiOutputObjective under which the samples are evaluated. Defaults to `IdentityMultiOutputObjective()`. constraints: A list of callables, each mapping a Tensor of dimension `sample_shape x batch-shape x q x m` to a Tensor of dimension `sample_shape x batch-shape x q`, where negative values imply feasibility. The acqusition function will compute expected feasible hypervolume. X_pending: A `batch_shape x m x d`-dim Tensor of `m` design points that have points that have been submitted for function evaluation but have not yet been evaluated. Concatenated into `X` upon forward call. Copied and set to have no gradient. eta: The temperature parameter for the sigmoid function used for the differentiable approximation of the constraints. """ if len(ref_point) != partitioning.num_outcomes: raise ValueError( "The length of the reference point must match the number of outcomes. " f"Got ref_point with {len(ref_point)} elements, but expected " f"{partitioning.num_outcomes}." ) ref_point = torch.tensor( ref_point, dtype=partitioning.pareto_Y.dtype, device=partitioning.pareto_Y.device, ) super().__init__( model=model, sampler=sampler, objective=objective, X_pending=X_pending ) self.constraints = constraints self.eta = eta self.register_buffer("ref_point", ref_point) self.partitioning = partitioning cell_bounds = self.partitioning.get_hypercell_bounds(ref_point=self.ref_point) self.register_buffer("cell_lower_bounds", cell_bounds[0]) self.register_buffer("cell_upper_bounds", cell_bounds[1]) self.q = -1 self.q_subset_indices = BufferDict() def _cache_q_subset_indices(self, q: int) -> None: r"""Cache indices corresponding to all subsets of `q`. This means that consecutive calls to `forward` with the same `q` will not recompute the indices for all (2^q - 1) subsets. Note: this will use more memory than regenerating the indices for each i and then deleting them, but it will be faster for repeated evaluations (e.g. during optimization). Args: q: batch size """ if q != self.q: indices = list(range(q)) tkwargs = {"dtype": torch.long, "device": self.cell_lower_bounds.device} self.q_subset_indices = BufferDict( { f"q_choose_{i}": torch.tensor( list(combinations(indices, i)), **tkwargs ) for i in range(1, q + 1) } ) self.q = q def _compute_qehvi(self, samples: Tensor) -> Tensor: r"""Compute the expected (feasible) hypervolume improvement given MC samples. Args: samples: A `n_samples x batch_shape x q x m`-dim tensor of samples. Returns: A `batch_shape`-dim tensor of expected hypervolume improvement for each batch. """ q = samples.shape[-2] # Note that the objective may subset the outcomes (e.g. this will usually happen # if there are constraints present). obj = self.objective(samples)[..., : self.ref_point.shape[0]] if self.constraints is not None: feas_weights = torch.ones( obj.shape[:-1], device=obj.device, dtype=obj.dtype ) feas_weights = apply_constraints_nonnegative_soft( obj=feas_weights, constraints=self.constraints, samples=samples, eta=self.eta, ) self._cache_q_subset_indices(q=q) batch_shape = samples.shape[:-2] areas_per_segment = torch.zeros( *batch_shape, self.cell_lower_bounds.shape[-2], dtype=obj.dtype, device=obj.device, ) sample_batch_view_shape = [ batch_shape[0] if self.cell_lower_bounds.ndim == 3 else 1 ] + [1] * (len(batch_shape) - 1) view_shape = ( *sample_batch_view_shape, self.cell_upper_bounds.shape[-2], 1, self.cell_upper_bounds.shape[-1], ) for i in range(1, q + 1): # TODO: we could use batches to compute (q choose i) and (q choose q-i) # simulataneously since subsets of size i and q-i have the same number of # elements. This would decrease the number of iterations, but increase # memory usage. q_choose_i = self.q_subset_indices[f"q_choose_{i}"] # this tensor is mc_samples x batch_shape x i x q_choose_i x m obj_subsets = torch.stack( [obj.index_select(dim=-2, index=q_choose_i[:, k]) for k in range(i)], dim=-3, ) # since all hyperrectangles share one vertex, the opposite vertex of the # overlap is given by the component-wise minimum. # take the minimum in each subset overlap_vertices = obj_subsets.min(dim=-3).values expanded_shape = ( batch_shape + self.cell_upper_bounds.shape[-2:-1] + overlap_vertices.shape[-2:] ) # add batch-dim to compute area for each segment (pseudo-pareto-vertex) # this tensor is mc_samples x batch_shape x num_cells x q_choose_i x m overlap_vertices = overlap_vertices.unsqueeze(-3).expand( *batch_shape, self.cell_lower_bounds.shape[-2], *overlap_vertices.shape[-2:], ) overlap_vertices = torch.min( overlap_vertices, self.cell_upper_bounds.view(view_shape).expand(expanded_shape), ) # substract cell lower bounds, clamp min at zero lengths_i = overlap_vertices - self.cell_lower_bounds.view( view_shape ).expand(expanded_shape) lengths_i = lengths_i.clamp_min(0.0) # take product over hyperrectangle side lengths to compute area # sum over all subsets of size i areas_i = lengths_i.prod(dim=-1).sum(dim=-1) # if constraints are present, apply a differentiable approximation of # the indicator function if self.constraints is not None: feas_subsets = torch.cat( [ feas_weights.index_select(dim=-1, index=q_choose_i[:, k]) for k in range(i) ], dim=-1, ) areas_i = areas_i * feas_subsets.prod(-1, keepdim=True) # Using the inclusion-exclusion principle, set the sign to be positive # for subsets of odd sizes and negative for subsets of even size areas_per_segment += (-1) ** (i + 1) * areas_i # sum over segments and average over MC samples return areas_per_segment.sum(dim=-1).mean(dim=0)
[docs] @concatenate_pending_points @t_batch_mode_transform() def forward(self, X: Tensor) -> Tensor: posterior = self.model.posterior(X) samples = self.sampler(posterior) return self._compute_qehvi(samples=samples)