Source code for botorch.generation.sampling

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

r"""
Sampling-based generation strategies.

A SamplingStrategy returns samples from the input points (i.e. Tensors in feature
space), rather than the value for a set of tensors, as acquisition functions do.
The q-batch dimension has similar semantics as for acquisition functions in that the
points across the q-batch are considered jointly for sampling (where as for
q-acquisition functions we evaluate the joint value of the q-batch).
"""

from __future__ import annotations

from abc import ABC, abstractmethod

import torch
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.objective import (
    IdentityMCObjective,
    MCAcquisitionObjective,
    PosteriorTransform,
)
from botorch.generation.utils import _flip_sub_unique
from botorch.models.model import Model

from botorch.models.model_list_gp_regression import ModelListGP
from botorch.models.multitask import MultiTaskGP
from botorch.utils.sampling import batched_multinomial
from botorch.utils.transforms import standardize
from torch import Tensor
from torch.nn import Module


[docs] class SamplingStrategy(Module, ABC): """Abstract base class for sampling-based generation strategies."""
[docs] @abstractmethod def forward(self, X: Tensor, num_samples: int = 1) -> Tensor: r"""Sample according to the SamplingStrategy. Args: X: A `batch_shape x N x d`-dim Tensor from which to sample (in the `N` dimension). num_samples: The number of samples to draw. Returns: A `batch_shape x num_samples x d`-dim Tensor of samples from `X`, where `X[..., i, :]` is the `i`-th sample. """ pass # pragma: no cover
[docs] class MaxPosteriorSampling(SamplingStrategy): r"""Sample from a set of points according to their max posterior value. Example: >>> MPS = MaxPosteriorSampling(model) # model w/ feature dim d=3 >>> X = torch.rand(2, 100, 3) >>> sampled_X = MPS(X, num_samples=5) """ def __init__( self, model: Model, objective: MCAcquisitionObjective | None = None, posterior_transform: PosteriorTransform | None = None, replacement: bool = True, ) -> None: r"""Constructor for the SamplingStrategy base class. Args: model: A fitted model. objective: The MCAcquisitionObjective under which the samples are evaluated. Defaults to `IdentityMCObjective()`. posterior_transform: An optional PosteriorTransform. replacement: If True, sample with replacement. """ super().__init__() self.model = model self.objective = IdentityMCObjective() if objective is None else objective self.posterior_transform = posterior_transform self.replacement = replacement
[docs] def forward( self, X: Tensor, num_samples: int = 1, observation_noise: bool = False ) -> Tensor: r"""Sample from the model posterior. Args: X: A `batch_shape x N x d`-dim Tensor from which to sample (in the `N` dimension) according to the maximum posterior value under the objective. num_samples: The number of samples to draw. observation_noise: If True, sample with observation noise. Returns: A `batch_shape x num_samples x d`-dim Tensor of samples from `X`, where `X[..., i, :]` is the `i`-th sample. """ posterior = self.model.posterior( X, observation_noise=observation_noise, posterior_transform=self.posterior_transform, ) # num_samples x batch_shape x N x m samples = posterior.rsample(sample_shape=torch.Size([num_samples])) return self.maximize_samples(X, samples, num_samples)
[docs] def maximize_samples(self, X: Tensor, samples: Tensor, num_samples: int = 1): obj = self.objective(samples, X=X) # num_samples x batch_shape x N if self.replacement: # if we allow replacement then things are simple(r) idcs = torch.argmax(obj, dim=-1) else: # if we need to deduplicate we have to do some tensor acrobatics # first we get the indices associated w/ the num_samples top samples _, idcs_full = torch.topk(obj, num_samples, dim=-1) # generate some indices to smartly index into the lower triangle of # idcs_full (broadcasting across batch dimensions) ridx, cindx = torch.tril_indices(num_samples, num_samples) # pick the unique indices in order - since we look at the lower triangle # of the index matrix and we don't sort, this achieves deduplication sub_idcs = idcs_full[ridx, ..., cindx] if sub_idcs.ndim == 1: idcs = _flip_sub_unique(sub_idcs, num_samples) elif sub_idcs.ndim == 2: # TODO: Find a better way to do this n_b = sub_idcs.size(-1) idcs = torch.stack( [_flip_sub_unique(sub_idcs[:, i], num_samples) for i in range(n_b)], dim=-1, ) else: # TODO: Find a general way to do this efficiently. raise NotImplementedError( "MaxPosteriorSampling without replacement for more than a single " "batch dimension is not yet implemented." ) # idcs is num_samples x batch_shape, to index into X we need to permute for it # to have shape batch_shape x num_samples if idcs.ndim > 1: idcs = idcs.permute(*range(1, idcs.ndim), 0) # in order to use gather, we need to repeat the index tensor d times idcs = idcs.unsqueeze(-1).expand(*idcs.shape, X.size(-1)) # now if the model is batched batch_shape will not necessarily be the # batch_shape of X, so we expand X to the proper shape Xe = X.expand(*obj.shape[1:], X.size(-1)) # finally we can gather along the N dimension return torch.gather(Xe, -2, idcs)
[docs] class BoltzmannSampling(SamplingStrategy): r"""Sample from a set of points according to a tempered acquisition value. Given an acquisition function `acq_func`, this sampling strategies draws samples from a `batch_shape x N x d`-dim tensor `X` according to a multinomial distribution over its indices given by weight(X[..., i, :]) ~ exp(eta * standardize(acq_func(X[..., i, :]))) where `standardize(Y)` standardizes `Y` to zero mean and unit variance. As the temperature parameter `eta -> 0`, this approaches uniform sampling, while as `eta -> infty`, this approaches selecting the maximizer(s) of the acquisition function `acq_func`. Example: >>> UCB = UpperConfidenceBound(model, beta=0.1) >>> BMUCB = BoltzmannSampling(UCB, eta=0.5) >>> X = torch.rand(2, 100, 3) >>> sampled_X = BMUCB(X, num_samples=5) """ def __init__( self, acq_func: AcquisitionFunction, eta: float = 1.0, replacement: bool = True ) -> None: r"""Boltzmann Acquisition Value Sampling. Args: acq_func: The acquisition function; to be evaluated in batch at the individual points of a q-batch (not jointly, as is the case for acquisition functions). Can be analytic or Monte-Carlo. eta: The temperature parameter in the softmax. replacement: If True, sample with replacement. """ super().__init__() self.acq_func = acq_func self.eta = eta self.replacement = replacement
[docs] def forward(self, X: Tensor, num_samples: int = 1) -> Tensor: r"""Sample from a tempered value of the acquisition function value. Args: X: A `batch_shape x N x d`-dim Tensor from which to sample (in the `N` dimension) according to the maximum posterior value under the objective. Note that if a batched model is used in the underlying acquisition function, then its batch shape must be broadcastable to `batch_shape`. num_samples: The number of samples to draw. Returns: A `batch_shape x num_samples x d`-dim Tensor of samples from `X`, where `X[..., i, :]` is the `i`-th sample. """ # TODO: Can we get the model batch shape property from the model? # we move the `N` dimension to the front for evaluating the acquisition function # so that X_eval has shape `N x batch_shape x 1 x d` X_eval = X.permute(-2, *range(X.ndim - 2), -1).unsqueeze(-2) acqval = self.acq_func(X_eval) # N x batch_shape # now move the `N` dimension back (this is the number of categories) acqval = acqval.permute(*range(1, X.ndim - 1), 0) # batch_shape x N weights = torch.exp(self.eta * standardize(acqval)) # batch_shape x N idcs = batched_multinomial( weights=weights, num_samples=num_samples, replacement=self.replacement ) # now do some gathering acrobatics to select the right elements from X return torch.gather(X, -2, idcs.unsqueeze(-1).expand(*idcs.shape, X.size(-1)))
[docs] class ConstrainedMaxPosteriorSampling(MaxPosteriorSampling): r"""Constrained max posterior sampling. Posterior sampling where we try to maximize an objective function while simulatenously satisfying a set of constraints c1(x) <= 0, c2(x) <= 0, ..., cm(x) <= 0 where c1, c2, ..., cm are black-box constraint functions. Each constraint function is modeled by a seperate GP model. We follow the procedure as described in https://doi.org/10.48550/arxiv.2002.08526. Example: >>> CMPS = ConstrainedMaxPosteriorSampling( model, constraint_model=ModelListGP(cmodel1, cmodel2), ) >>> X = torch.rand(2, 100, 3) >>> sampled_X = CMPS(X, num_samples=5) """ def __init__( self, model: Model, constraint_model: ModelListGP | MultiTaskGP, objective: MCAcquisitionObjective | None = None, posterior_transform: PosteriorTransform | None = None, replacement: bool = True, ) -> None: r"""Constructor for the SamplingStrategy base class. Args: model: A fitted model. objective: The MCAcquisitionObjective under which the samples are evaluated. Defaults to `IdentityMCObjective()`. posterior_transform: An optional PosteriorTransform for the objective function (corresponding to `model`). replacement: If True, sample with replacement. constraint_model: either a ModelListGP where each submodel is a GP model for one constraint function, or a MultiTaskGP model where each task is one constraint function. All constraints are of the form c(x) <= 0. In the case when the constraint model predicts that all candidates violate constraints, we pick the candidates with minimum violation. """ if objective is not None: raise NotImplementedError( "`objective` is not supported for `ConstrainedMaxPosteriorSampling`." ) super().__init__( model=model, objective=objective, posterior_transform=posterior_transform, replacement=replacement, ) self.constraint_model = constraint_model def _convert_samples_to_scores(self, Y_samples, C_samples) -> Tensor: r"""Convert the objective and constraint samples into a score. The logic is as follows: - If a realization has at least one feasible candidate we use the objective value as the score and set all infeasible candidates to -inf. - If a realization doesn't have a feasible candidate we set the score to the negative total violation of the constraints to incentivize choosing the candidate with the smallest constraint violation. Args: Y_samples: A `num_samples x batch_shape x num_cand x 1`-dim Tensor of samples from the objective function. C_samples: A `num_samples x batch_shape x num_cand x num_constraints`-dim Tensor of samples from the constraints. Returns: A `num_samples x batch_shape x num_cand x 1`-dim Tensor of scores. """ is_feasible = (C_samples <= 0).all( dim=-1 ) # num_samples x batch_shape x num_cand has_feasible_candidate = is_feasible.any(dim=-1) scores = Y_samples.clone() scores[~is_feasible] = -float("inf") if not has_feasible_candidate.all(): # Use negative total violation for samples where no candidate is feasible total_violation = ( C_samples[~has_feasible_candidate] .clamp(min=0) .sum(dim=-1, keepdim=True) ) scores[~has_feasible_candidate] = -total_violation return scores
[docs] def forward( self, X: Tensor, num_samples: int = 1, observation_noise: bool = False ) -> Tensor: r"""Sample from the model posterior. Args: X: A `batch_shape x N x d`-dim Tensor from which to sample (in the `N` dimension) according to the maximum posterior value under the objective. num_samples: The number of samples to draw. observation_noise: If True, sample with observation noise. Returns: A `batch_shape x num_samples x d`-dim Tensor of samples from `X`, where `X[..., i, :]` is the `i`-th sample. """ posterior = self.model.posterior( X=X, observation_noise=observation_noise, # Note: `posterior_transform` is only used for the objective posterior_transform=self.posterior_transform, ) Y_samples = posterior.rsample(sample_shape=torch.Size([num_samples])) # Loop over the constraint models (if possible) to reduce peak memory usage. constraint_models = ( self.constraint_model.models if isinstance(self.constraint_model, ModelListGP) else [self.constraint_model] ) C_samples_list = [] for c_model in constraint_models: c_posterior = c_model.posterior(X=X, observation_noise=observation_noise) C_samples_list.append( c_posterior.rsample(sample_shape=torch.Size([num_samples])) ) C_samples = torch.cat(C_samples_list, dim=-1) # Convert the objective and constraint samples into a scalar-valued "score" scores = self._convert_samples_to_scores( Y_samples=Y_samples, C_samples=C_samples ) return self.maximize_samples(X=X, samples=scores, num_samples=num_samples)