Source code for botorch.acquisition.multi_objective.logei

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

r"""
Multi-objective variants of the LogEI family of acquisition functions, see
[Ament2023logei]_ for details.
"""

from __future__ import annotations

from typing import Callable, List, Optional, Tuple, Union

import torch
from botorch.acquisition.logei import TAU_MAX, TAU_RELU
from botorch.acquisition.multi_objective import MultiObjectiveMCAcquisitionFunction
from botorch.acquisition.multi_objective.objective import MCMultiOutputObjective
from botorch.models.model import Model
from botorch.sampling.base import MCSampler
from botorch.utils.multi_objective.box_decompositions.non_dominated import (
    NondominatedPartitioning,
)
from botorch.utils.multi_objective.hypervolume import (
    NoisyExpectedHypervolumeMixin,
    SubsetIndexCachingMixin,
)
from botorch.utils.objective import compute_smoothed_feasibility_indicator
from botorch.utils.safe_math import (
    fatmin,
    log_fatplus,
    log_softplus,
    logdiffexp,
    logmeanexp,
    logplusexp,
    logsumexp,
    smooth_amin,
)
from botorch.utils.transforms import (
    concatenate_pending_points,
    is_ensemble,
    match_batch_shape,
    t_batch_mode_transform,
)
from torch import Tensor


[docs] class qLogExpectedHypervolumeImprovement( MultiObjectiveMCAcquisitionFunction, SubsetIndexCachingMixin ): _log: bool = True def __init__( self, model: Model, ref_point: Union[List[float], Tensor], partitioning: NondominatedPartitioning, sampler: Optional[MCSampler] = None, objective: Optional[MCMultiOutputObjective] = None, constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, X_pending: Optional[Tensor] = None, eta: Optional[Union[Tensor, float]] = 1e-2, fat: bool = True, tau_relu: float = TAU_RELU, tau_max: float = TAU_MAX, ) -> None: r"""Parallel Log Expected Hypervolume Improvement supporting m>=2 outcomes. See [Ament2023logei]_ for details and the methodology behind the LogEI family of acquisition function. Line-by-line differences to the original differentiable expected hypervolume formulation of [Daulton2020qehvi]_ are described via inline comments in `forward`. Example: >>> model = SingleTaskGP(train_X, train_Y) >>> ref_point = [0.0, 0.0] >>> acq = qLogExpectedHypervolumeImprovement(model, ref_point, partitioning) >>> value = acq(test_X) Args: model: A fitted model. ref_point: A list or tensor with `m` elements representing the reference point (in the outcome space) w.r.t. to which compute the hypervolume. This is a reference point for the objective values (i.e. after applying`objective` to the samples). partitioning: A `NondominatedPartitioning` module that provides the non- dominated front and a partitioning of the non-dominated space in hyper- rectangles. If constraints are present, this partitioning must only include feasible points. sampler: The sampler used to draw base samples. If not given, a sampler is generated using `get_sampler`. objective: The MCMultiOutputObjective under which the samples are evaluated. Defaults to `IdentityMultiOutputObjective()`. constraints: A list of callables, each mapping a Tensor of dimension `sample_shape x batch-shape x q x m` to a Tensor of dimension `sample_shape x batch-shape x q`, where negative values imply feasibility. The acquisition function will compute expected feasible hypervolume. X_pending: A `batch_shape x m x d`-dim Tensor of `m` design points that have points that have been submitted for function evaluation but have not yet been evaluated. Concatenated into `X` upon forward call. Copied and set to have no gradient. eta: The temperature parameter for the sigmoid function used for the differentiable approximation of the constraints. In case of a float the same eta is used for every constraint in constraints. In case of a tensor the length of the tensor must match the number of provided constraints. The i-th constraint is then estimated with the i-th eta value. fat: Toggles the logarithmic / linear asymptotic behavior of the smooth approximation to the ReLU and the maximum. tau_relu: Temperature parameter controlling the sharpness of the approximation to the ReLU over the `q` candidate points. For further details, see the comments above the definition of `TAU_RELU`. tau_max: Temperature parameter controlling the sharpness of the approximation to the `max` operator over the `q` candidate points. For further details, see the comments above the definition of `TAU_MAX`. """ if len(ref_point) != partitioning.num_outcomes: raise ValueError( "The dimensionality of the reference point must match the number of " f"outcomes. Got ref_point with {len(ref_point)} elements, but expected " f"{partitioning.num_outcomes}." ) ref_point = torch.as_tensor( ref_point, dtype=partitioning.pareto_Y.dtype, device=partitioning.pareto_Y.device, ) super().__init__( model=model, sampler=sampler, objective=objective, constraints=constraints, eta=eta, X_pending=X_pending, ) self.register_buffer("ref_point", ref_point) cell_bounds = partitioning.get_hypercell_bounds() self.register_buffer("cell_lower_bounds", cell_bounds[0]) self.register_buffer("cell_upper_bounds", cell_bounds[1]) SubsetIndexCachingMixin.__init__(self) self.tau_relu = tau_relu self.tau_max = tau_max self.fat = fat def _compute_log_qehvi(self, samples: Tensor, X: Optional[Tensor] = None) -> Tensor: r"""Compute the expected (feasible) hypervolume improvement given MC samples. Args: samples: A `sample_shape x batch_shape x q' x m`-dim tensor of samples. X: A `batch_shape x q x d`-dim tensor of inputs. Returns: A `batch_shape x (model_batch_shape)`-dim tensor of expected hypervolume improvement for each batch. """ # Note that the objective may subset the outcomes (e.g. this will usually happen # if there are constraints present). obj = self.objective(samples, X=X) # mc_samples x batch_shape x q x m q = obj.shape[-2] if self.constraints is not None: log_feas_weights = compute_smoothed_feasibility_indicator( constraints=self.constraints, samples=samples, eta=self.eta, log=True, fat=self.fat, ) device = self.ref_point.device q_subset_indices = self.compute_q_subset_indices(q_out=q, device=device) batch_shape = obj.shape[:-2] # mc_samples x batch_shape # areas tensor is `mc_samples x batch_shape x num_cells x 2`-dim log_areas_per_segment = torch.full( size=( *batch_shape, self.cell_lower_bounds.shape[-2], # num_cells 2, # for even and odd terms ), fill_value=-torch.inf, dtype=obj.dtype, device=device, ) cell_batch_ndim = self.cell_lower_bounds.ndim - 2 # conditionally adding mc_samples dim if cell_batch_ndim > 0 # adding ones to shape equal in number to to batch_shape_ndim - cell_batch_ndim # adding cell_bounds batch shape w/o 1st dimension sample_batch_view_shape = torch.Size( [ batch_shape[0] if cell_batch_ndim > 0 else 1, *[1 for _ in range(len(batch_shape) - max(cell_batch_ndim, 1))], *self.cell_lower_bounds.shape[1:-2], ] ) view_shape = ( *sample_batch_view_shape, self.cell_upper_bounds.shape[-2], # num_cells 1, # adding for q_choose_i dimension self.cell_upper_bounds.shape[-1], # num_objectives ) for i in range(1, self.q_out + 1): # TODO: we could use batches to compute (q choose i) and (q choose q-i) # simultaneously since subsets of size i and q-i have the same number of # elements. This would decrease the number of iterations, but increase # memory usage. q_choose_i = q_subset_indices[f"q_choose_{i}"] # q_choose_i x i # this tensor is mc_samples x batch_shape x i x q_choose_i x m obj_subsets = obj.index_select(dim=-2, index=q_choose_i.view(-1)) obj_subsets = obj_subsets.view( obj.shape[:-2] + q_choose_i.shape + obj.shape[-1:] ) # mc_samples x batch_shape x q_choose_i x i x m # NOTE: the order of operations in non-log _compute_qehvi is 3), 1), 2). # since 3) moved above 1), _log_improvement adds another Tensor dimension # that keeps track of num_cells. # 1) computes log smoothed improvement over the cell lower bounds. # mc_samples x batch_shape x num_cells x q_choose_i x i x m log_improvement_i = self._log_improvement(obj_subsets, view_shape) # 2) take the minimum log improvement over all i subsets. # since all hyperrectangles share one vertex, the opposite vertex of the # overlap is given by the component-wise minimum. # negative of maximum of negative log_improvement is approximation to min. log_improvement_i = self._smooth_min( log_improvement_i, dim=-2, ) # mc_samples x batch_shape x num_cells x q_choose_i x m # 3) compute the log lengths of the cells' sides. # mc_samples x batch_shape x num_cells x q_choose_i x m log_lengths_i = self._log_cell_lengths(log_improvement_i, view_shape) # 4) take product over hyperrectangle side lengths to compute area (m-dim). # after, log_areas_i is mc_samples x batch_shape x num_cells x q_choose_i log_areas_i = log_lengths_i.sum(dim=-1) # areas_i = lengths_i.prod(dim=-1) # 5) if constraints are present, apply a differentiable approximation of # the indicator function. if self.constraints is not None: log_feas_subsets = log_feas_weights.index_select( dim=-1, index=q_choose_i.view(-1) ).view(log_feas_weights.shape[:-1] + q_choose_i.shape) log_areas_i = log_areas_i + log_feas_subsets.unsqueeze(-3).sum(dim=-1) # 6) sum over all subsets of size i, i.e. reduce over q_choose_i-dim # after, log_areas_i is mc_samples x batch_shape x num_cells log_areas_i = logsumexp(log_areas_i, dim=-1) # areas_i.sum(dim=-1) # 7) Using the inclusion-exclusion principle, set the sign to be positive # for subsets of odd sizes and negative for subsets of even size # in non-log space: areas_per_segment += (-1) ** (i + 1) * areas_i, # but here in log space, we need to keep track of sign: log_areas_per_segment[..., i % 2] = logplusexp( log_areas_per_segment[..., i % 2], log_areas_i, ) # 8) subtract even from odd log area terms log_areas_per_segment = logdiffexp( log_a=log_areas_per_segment[..., 0], log_b=log_areas_per_segment[..., 1] ) # 9) sum over segments (n_cells-dim) and average over MC samples return logmeanexp(logsumexp(log_areas_per_segment, dim=-1), dim=0) def _log_improvement( self, obj_subsets: Tensor, view_shape: Union[Tuple, torch.Size] ) -> Tensor: # smooth out the clamp and take the log (previous step 3) # subtract cell lower bounds, clamp min at zero, but first # make obj_subsets broadcastable with cell bounds: # mc_samples x batch_shape x (num_cells = 1) x q_choose_i x i x m obj_subsets = obj_subsets.unsqueeze(-4) # making cell bounds broadcastable with obj_subsets: # (mc_samples = 1) x (batch_shape = 1) x num_cells x 1 x (i = 1) x m cell_lower_bounds = self.cell_lower_bounds.view(view_shape).unsqueeze(-3) Z = obj_subsets - cell_lower_bounds log_Zi = self._log_smooth_relu(Z) return log_Zi # mc_samples x batch_shape x num_cells x q_choose_i x i x m def _log_cell_lengths( self, log_improvement_i: Tensor, view_shape: Union[Tuple, torch.Size] ) -> Tensor: cell_upper_bounds = self.cell_upper_bounds.clamp_max( 1e10 if log_improvement_i.dtype == torch.double else 1e8 ) # num_cells x num_objectives # add batch-dim to compute area for each segment (pseudo-pareto-vertex) log_cell_lengths = ( (cell_upper_bounds - self.cell_lower_bounds).log().view(view_shape) ) # (mc_samples = 1) x (batch_shape = 1) x n_cells x (q_choose_i = 1) x m # mc_samples x batch_shape x num_cells x q_choose_i x m return self._smooth_minimum( log_improvement_i, log_cell_lengths, ) def _log_smooth_relu(self, X: Tensor) -> Tensor: f = log_fatplus if self.fat else log_softplus return f(X, tau=self.tau_relu) def _smooth_min(self, X: Tensor, dim: int, keepdim: bool = False) -> Tensor: f = fatmin if self.fat else smooth_amin return f(X, tau=self.tau_max, dim=dim) def _smooth_minimum(self, X: Tensor, Y: Tensor) -> Tensor: XY = torch.stack(torch.broadcast_tensors(X, Y), dim=-1) return self._smooth_min(XY, dim=-1, keepdim=False)
[docs] @concatenate_pending_points @t_batch_mode_transform() def forward(self, X: Tensor) -> Tensor: posterior = self.model.posterior(X) samples = self.get_posterior_samples(posterior) return self._compute_log_qehvi(samples=samples, X=X)
class qLogNoisyExpectedHypervolumeImprovement( NoisyExpectedHypervolumeMixin, qLogExpectedHypervolumeImprovement, ): _log: bool = True def __init__( self, model: Model, ref_point: Union[List[float], Tensor], X_baseline: Tensor, sampler: Optional[MCSampler] = None, objective: Optional[MCMultiOutputObjective] = None, constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, X_pending: Optional[Tensor] = None, eta: Optional[Union[Tensor, float]] = 1e-3, prune_baseline: bool = False, alpha: float = 0.0, cache_pending: bool = True, max_iep: int = 0, incremental_nehvi: bool = True, cache_root: bool = True, tau_relu: float = TAU_RELU, tau_max: float = 1e-3, # TAU_MAX, fat: bool = True, marginalize_dim: Optional[int] = None, ) -> None: r""" q-Log Noisy Expected Hypervolume Improvement supporting m>=2 outcomes. Based on the differentiable hypervolume formulation of [Daulton2021nehvi]_. Example: >>> model = SingleTaskGP(train_X, train_Y) >>> ref_point = [0.0, 0.0] >>> qNEHVI = qNoisyExpectedHypervolumeImprovement(model, ref_point, train_X) >>> qnehvi = qNEHVI(test_X) Args: model: A fitted model. ref_point: A list or tensor with `m` elements representing the reference point (in the outcome space) w.r.t. to which compute the hypervolume. This is a reference point for the objective values (i.e. after applying `objective` to the samples). X_baseline: A `r x d`-dim Tensor of `r` design points that have already been observed. These points are considered as potential approximate pareto-optimal design points. sampler: The sampler used to draw base samples. If not given, a sampler is generated using `get_sampler`. Note: a pareto front is created for each mc sample, which can be computationally intensive for `m` > 2. objective: The MCMultiOutputObjective under which the samples are evaluated. Defaults to `IdentityMultiOutputObjective()`. constraints: A list of callables, each mapping a Tensor of dimension `sample_shape x batch-shape x q x m` to a Tensor of dimension `sample_shape x batch-shape x q`, where negative values imply feasibility. The acquisition function will compute expected feasible hypervolume. X_pending: A `batch_shape x m x d`-dim Tensor of `m` design points that have points that have been submitted for function evaluation, but have not yet been evaluated. eta: The temperature parameter for the sigmoid function used for the differentiable approximation of the constraints. In case of a float the same `eta` is used for every constraint in constraints. In case of a tensor the length of the tensor must match the number of provided constraints. The i-th constraint is then estimated with the i-th `eta` value. prune_baseline: If True, remove points in `X_baseline` that are highly unlikely to be the pareto optimal and better than the reference point. This can significantly improve computation time and is generally recommended. In order to customize pruning parameters, instead manually call `prune_inferior_points_multi_objective` on `X_baseline` before instantiating the acquisition function. alpha: The hyperparameter controlling the approximate non-dominated partitioning. The default value of 0.0 means an exact partitioning is used. As the number of objectives `m` increases, consider increasing this parameter in order to limit computational complexity. cache_pending: A boolean indicating whether to use cached box decompositions (CBD) for handling pending points. This is generally recommended. max_iep: The maximum number of pending points before the box decompositions will be recomputed. incremental_nehvi: A boolean indicating whether to compute the incremental NEHVI from the `i`th point where `i=1, ..., q` under sequential greedy optimization, or the full qNEHVI over `q` points. cache_root: A boolean indicating whether to cache the root decomposition over `X_baseline` and use low-rank updates. marginalize_dim: A batch dimension that should be marginalized. """ MultiObjectiveMCAcquisitionFunction.__init__( self, model=model, sampler=sampler, objective=objective, constraints=constraints, eta=eta, ) SubsetIndexCachingMixin.__init__(self) NoisyExpectedHypervolumeMixin.__init__( self, model=model, ref_point=ref_point, X_baseline=X_baseline, sampler=sampler, objective=objective, constraints=constraints, X_pending=X_pending, prune_baseline=prune_baseline, alpha=alpha, cache_pending=cache_pending, max_iep=max_iep, incremental_nehvi=incremental_nehvi, cache_root=cache_root, marginalize_dim=marginalize_dim, ) # parameters that are used by qLogEHVI self.tau_relu = tau_relu self.tau_max = tau_max self.fat = fat @concatenate_pending_points @t_batch_mode_transform() def forward(self, X: Tensor) -> Tensor: X_full = torch.cat([match_batch_shape(self.X_baseline, X), X], dim=-2) # NOTE: To ensure that we correctly sample `f(X)` from the joint distribution # `f((X_baseline, X)) ~ P(f | D)`, it is critical to compute the joint posterior # over X *and* X_baseline -- which also contains pending points whenever there # are any -- since the baseline and pending values `f(X_baseline)` are # generally pre-computed and cached before the `forward` call, see the docs of # `cache_pending` for details. # TODO: Improve the efficiency by not re-computing the X_baseline-X_baseline # covariance matrix, but only the covariance of # 1) X and X, and # 2) X and X_baseline. posterior = self.model.posterior(X_full) # Account for possible one-to-many transform and the model batch dimensions in # ensemble models. event_shape_lag = 1 if is_ensemble(self.model) else 2 n_w = ( posterior._extended_shape()[X_full.dim() - event_shape_lag] // X_full.shape[-2] ) q_in = X.shape[-2] * n_w self._set_sampler(q_in=q_in, posterior=posterior) samples = self._get_f_X_samples(posterior=posterior, q_in=q_in) # Add previous nehvi from pending points. nehvi = self._compute_log_qehvi(samples=samples, X=X) if self.incremental_nehvi: return nehvi return logplusexp(nehvi, self._prev_nehvi.log())