Source code for botorch.acquisition.max_value_entropy_search

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

r"""
Acquisition functions for max-value entropy search (MES) and
multi-fidelity MES with noisy observation and trace observations.

References

.. [Wang2018mves]
    Wang, Z., Jegelka, S.,
    Max-value Entropy Search for Efficient Bayesian Optimization.
    arXiv:1703.01968v3, 2018

.. [Takeno2019mfmves]
    Takeno, S., et al.,
    Multi-fidelity Bayesian Optimization with Max-value Entropy Search.
    arXiv:1901.08275v1, 2019
"""

from __future__ import annotations

from copy import deepcopy
from math import log
from typing import Callable, Optional

import torch
from scipy.optimize import brentq
from torch import Tensor

from ..models.cost import AffineFidelityCostModel
from ..models.model import Model
from ..models.utils import check_no_nans
from ..sampling.samplers import SobolQMCNormalSampler
from ..utils.transforms import match_batch_shape, t_batch_mode_transform
from .cost_aware import CostAwareUtility, InverseCostWeightedUtility
from .monte_carlo import MCAcquisitionFunction


CLAMP_LB = 1.0e-8


[docs]class qMaxValueEntropy(MCAcquisitionFunction): r"""The acquisition function for Max-value Entropy Search. This acquisition function computes the mutual information of max values and a candidate point X. See [Wang2018mves]_ for a detailed discussion. The model must be single-outcome. q > 1 is supported through cyclic optimization and fantasies. Example: >>> model = SingleTaskGP(train_X, train_Y) >>> candidate_set = torch.rand(1000, bounds.size(1)) >>> candidate_set = bounds[0] + (bounds[1] - bounds[0]) * candidate_set >>> MES = qMaxValueEntropy(model, candidate_set) >>> mes = MES(test_X) """ def __init__( self, model: Model, candidate_set: Tensor, num_fantasies: int = 16, num_mv_samples: int = 10, num_y_samples: int = 128, use_gumbel: bool = True, maximize: bool = True, X_pending: Optional[Tensor] = None, ) -> None: r"""Single-outcome max-value entropy search acquisition function. Args: model: A fitted single-outcome model. candidate_set: A `n x d` Tensor including `n` candidate points to discretize the design space. Max values are sampled from the (joint) model posterior over these points. num_fantasies: Number of fantasies to generate. The higher this number the more accurate the model (at the expense of model complexity, wall time and memory). Ignored if `X_pending` is `None`. num_mv_samples: Number of max value samples. num_y_samples: Number of posterior samples at specific design point `X`. use_gumbel: If True, use Gumbel approximation to sample the max values. X_pending: A `m x d`-dim Tensor of `m` design points that have been submitted for function evaluation but have not yet been evaluated. maximize: If True, consider the problem a maximization problem. """ sampler = SobolQMCNormalSampler(num_y_samples) super().__init__(model=model, sampler=sampler) # Batch GP models (e.g. fantasized models) are not currently supported if self.model.train_inputs[0].ndim > 2: raise NotImplementedError( "Batch GP models (e.g. fantasized models) " "are not yet supported by qMaxValueEntropy" ) self._init_model = model # only used for the `fantasize()` in `set_X_pending()` train_inputs = match_batch_shape(model.train_inputs[0], candidate_set) self.candidate_set = torch.cat([candidate_set, train_inputs], dim=0) self.fantasies_sampler = SobolQMCNormalSampler(num_fantasies) self.num_fantasies = num_fantasies self.use_gumbel = use_gumbel self.num_mv_samples = num_mv_samples self.maximize = maximize self.weight = 1.0 if maximize else -1.0 # If we put the `self._sample_max_values()` to `set_X_pending()`, # it will throw errors when the initial `super().__init__()` is called, # since some members required by `_sample_max_values()` are not yet initialized. if X_pending is None: self._sample_max_values() else: self.set_X_pending(X_pending)
[docs] def set_X_pending(self, X_pending: Optional[Tensor] = None) -> None: r"""Set pending points. Informs the acquisition function about pending design points, fantasizes the model on the pending points and draws max-value samples from the fantasized model posterior. Args: X_pending: `m x d` Tensor with `m` `d`-dim design points that have been submitted for evaluation but have not yet been evaluated. """ super().set_X_pending(X_pending=X_pending) if X_pending is not None: # fantasize the model fantasy_model = self._init_model.fantasize( X=X_pending, sampler=self.fantasies_sampler, observation_noise=True ) self.model = fantasy_model self._sample_max_values() else: # This is mainly for setting the model to the original model # after the sequential optimization at q > 1 try: self.model = self._init_model self._sample_max_values() except AttributeError: pass
def _sample_max_values(self): r"""Sample max values for MC approximation of the expectation in MES""" with torch.no_grad(): # Append X_pending to candidate set if self.X_pending is None: X_pending = torch.tensor([], dtype=self.candidate_set.dtype) else: X_pending = self.X_pending X_pending = match_batch_shape(X_pending, self.candidate_set) candidate_set = torch.cat([self.candidate_set, X_pending], dim=0) # project the candidate_set to the highest fidelity, # which is needed for the multi-fidelity MES try: candidate_set = self.project(candidate_set) except AttributeError: pass # sample max values if self.use_gumbel: self.posterior_max_values = _sample_max_value_Gumbel( self.model, candidate_set, self.num_mv_samples, self.maximize ) else: self.posterior_max_values = _sample_max_value_Thompson( self.model, candidate_set, self.num_mv_samples, self.maximize )
[docs] @t_batch_mode_transform(expected_q=1) def forward(self, X: Tensor) -> Tensor: r"""Compute max-value entropy at the design points `X`. Args: X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches with `1` `d`-dim design points each. Returns: A `batch_shape`-dim Tensor of MVE values at the given design points `X`. """ # Compute the posterior, posterior mean, variance and std posterior = self.model.posterior(X.unsqueeze(-3), observation_noise=False) mean = self.weight * posterior.mean.squeeze(-1).squeeze(-1) # batch_shape x num_fantasies variance = posterior.variance.clamp_min(CLAMP_LB).view_as(mean) check_no_nans(mean) check_no_nans(variance) ig = self._compute_information_gain( X=X, mean_M=mean, variance_M=variance, covar_mM=variance.unsqueeze(-1) ) return ig.mean(dim=0) # average over the fantasies
def _compute_information_gain( self, X: Tensor, mean_M: Tensor, variance_M: Tensor, covar_mM: Tensor ) -> Tensor: r"""Computes the information gain at the design points `X`. Approximately computes the information gain at the design points `X`, for both MES with noisy observations and multi-fidelity MES with noisy observation and trace observations. The implementation is inspired from the paper on multi-fidelity MES by Takeno et. al. [Takeno2019mfmves]_. The notations in the comments in this function follows the Appendix A in the paper. Args: X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches with `1` `d`-dim design point each. mean_M, variance_M: `batch_shape x num_fantasies`-dim Tensors of `batch_shape` t-batches with `num_fantasies` fantasies. `num_fantasies = 1` for non-fantasized models. All are obtained without noise. covar_mM: `batch_shape x num_fantasies x (1 + num_trace_observations)` -dim Tensor. `num_fantasies = 1` for non-fantasized models. All are obtained without noise. Returns: A `num_fantasies x batch_shape`-dim Tensor of information gains at the given design points `X`. """ # compute the std_m, variance_m with noisy observation posterior_m = self.model.posterior(X.unsqueeze(-3), observation_noise=True) mean_m = self.weight * posterior_m.mean.squeeze(-1) # batch_shape x num_fantasies x (1 + num_trace_observations) variance_m = posterior_m.mvn.covariance_matrix # batch_shape x num_fantasies x (1 + num_trace_observations)^2 check_no_nans(variance_m) # compute mean and std for fM|ym, x, Dt ~ N(u, s^2) samples_m = self.weight * self.sampler(posterior_m).squeeze(-1) # s_m x batch_shape x num_fantasies x (1 + num_trace_observations) L = torch.cholesky(variance_m) temp_term = torch.cholesky_solve(covar_mM.unsqueeze(-1), L).transpose(-2, -1) # equivalent to torch.matmul(covar_mM.unsqueeze(-2), torch.inverse(variance_m)) # batch_shape x num_fantasies x 1 x (1 + num_trace_observations) mean_pt1 = torch.matmul(temp_term, (samples_m - mean_m).unsqueeze(-1)) mean_new = mean_pt1.squeeze(-1).squeeze(-1) + mean_M # s_m x batch_shape x num_fantasies variance_pt1 = torch.matmul(temp_term, covar_mM.unsqueeze(-1)) variance_new = variance_M - variance_pt1.squeeze(-1).squeeze(-1) # batch_shape x num_fantasies stdv_new = variance_new.clamp_min(CLAMP_LB).sqrt() # batch_shape x num_fantasies # define normal distribution to compute cdf and pdf normal = torch.distributions.Normal( torch.zeros(1, device=X.device, dtype=X.dtype), torch.ones(1, device=X.device, dtype=X.dtype), ) # Compute p(fM <= f* | ym, x, Dt) view_shape = ( [self.num_mv_samples] + [1] * (len(X.shape) - 2) + [self.num_fantasies] ) # s_M x batch_shape x num_fantasies if self.X_pending is None: view_shape[-1] = 1 max_vals = self.posterior_max_values.view(view_shape).unsqueeze(1) # s_M x 1 x batch_shape x num_fantasies normalized_mvs_new = (max_vals - mean_new) / stdv_new # s_M x s_m x batch_shape x num_fantasies = # s_M x 1 x batch_shape x num_fantasies - s_m x batch_shape x num_fantasies cdf_mvs_new = normal.cdf(normalized_mvs_new).clamp_min(CLAMP_LB) # Compute p(fM <= f* | x, Dt) stdv_M = variance_M.sqrt() normalized_mvs = (max_vals - mean_M) / stdv_M # s_M x 1 x batch_shape x num_fantasies = # s_M x 1 x 1 x num_fantasies - batch_shape x num_fantasies cdf_mvs = normal.cdf(normalized_mvs).clamp_min(CLAMP_LB) # s_M x 1 x batch_shape x num_fantasies # Compute log(p(ym | x, Dt)) log_pdf_fm = posterior_m.mvn.log_prob(self.weight * samples_m).unsqueeze(0) # 1 x s_m x batch_shape x num_fantasies # H0 = H(ym | x, Dt) H0 = posterior_m.mvn.entropy() # batch_shape x num_fantasies # regression adjusted H1 estimation, H1_hat = H1_bar - beta * (H0_bar - H0) # H1 = E_{f*|x, Dt}[H(ym|f*, x, Dt)] Z = cdf_mvs_new / cdf_mvs # s_M x s_m x batch_shape x num_fantasies h1 = -Z * Z.log() - Z * log_pdf_fm # s_M x s_m x batch_shape x num_fantasies check_no_nans(h1) dim = [0, 1] # dimension of fm samples, fM samples H1_bar = h1.mean(dim=dim) h0 = -log_pdf_fm H0_bar = h0.mean(dim=dim) cov = ((h1 - H1_bar) * (h0 - H0_bar)).mean(dim=dim) beta = cov / (h0.var(dim=dim) * h1.var(dim=dim)).sqrt() H1_hat = H1_bar - beta * (H0_bar - H0) ig = H0 - H1_hat # batch_shape x num_fantasies ig = ig.permute(-1, *range(ig.dim() - 1)) # num_fantasies x batch_shape return ig
[docs]class qMultiFidelityMaxValueEntropy(qMaxValueEntropy): r"""Multi-fidelity max-value entropy. The acquisition function for multi-fidelity max-value entropy search with support for trace observations. See [Takeno2019mfmves]_ for a detailed discussion of the basic ideas on multi-fidelity MES (note that this implementation is somewhat different). The model must be single-outcome. q > 1 is supported through cyclic optimization and fantasies. Example: >>> model = SingleTaskGP(train_X, train_Y) >>> candidate_set = torch.rand(1000, bounds.size(1)) >>> candidate_set = bounds[0] + (bounds[1] - bounds[0]) * candidate_set >>> MF_MES = qMultiFidelityMaxValueEntropy(model, candidate_set) >>> mf_mes = MF_MES(test_X) """ def __init__( self, model: Model, candidate_set: Tensor, num_fantasies: int = 16, num_mv_samples: int = 10, num_y_samples: int = 128, use_gumbel: bool = True, X_pending: Optional[Tensor] = None, maximize: bool = True, cost_aware_utility: Optional[CostAwareUtility] = None, project: Callable[[Tensor], Tensor] = lambda X: X, expand: Callable[[Tensor], Tensor] = lambda X: X, ) -> None: r"""Single-outcome max-value entropy search acquisition function. Args: model: A fitted single-outcome model. candidate_set: A `n x d` Tensor including `n` candidate points to discretize the design space, which will be used to sample the max values from their posteriors. cost_aware_utility: A CostAwareUtility computing the cost-transformed utility from a candidate set and samples of increases in utility. num_fantasies: Number of fantasies to generate. The higher this number the more accurate the model (at the expense of model complexity and performance) and it's only used when `X_pending` is not `None`. num_mv_samples: Number of max value samples. num_y_samples: Number of posterior samples at specific design point `X`. use_gumbel: If True, use Gumbel approximation to sample the max values. X_pending: A `m x d`-dim Tensor of `m` design points that have been submitted for function evaluation but have not yet been evaluated. maximize: If True, consider the problem a maximization problem. cost_aware_utility: A CostAwareUtility computing the cost-transformed utility from a candidate set and samples of increases in utility. project: A callable mapping a `batch_shape x q x d` tensor of design points to a tensor of the same shape projected to the desired target set (e.g. the target fidelities in case of multi-fidelity optimization). expand: A callable mapping a `batch_shape x q x d` input tensor to a `batch_shape x (q + q_e)' x d`-dim output tensor, where the `q_e` additional points in each q-batch correspond to additional ("trace") observations. """ super().__init__( model=model, candidate_set=candidate_set, num_fantasies=num_fantasies, num_mv_samples=num_mv_samples, num_y_samples=num_y_samples, X_pending=X_pending, use_gumbel=use_gumbel, maximize=maximize, ) if cost_aware_utility is None: cost_model = AffineFidelityCostModel(fidelity_weights={-1: 1.0}) cost_aware_utility = InverseCostWeightedUtility(cost_model=cost_model) self.cost_aware_utility = cost_aware_utility self.expand = expand self.project = project self._cost_sampler = None # @TODO make sure fidelity_dims align in project, expand & cost_aware_utility # It seems very difficult due to the current way of handling project/expand # resample max values after initializing self.project # so that the max value samples are at the highest fidelity self._sample_max_values() @property def cost_sampler(self): if self._cost_sampler is None: # Note: Using the deepcopy here is essential. Removing this poses a # problem if the base model and the cost model have a different number # of outputs or test points (this would be caused by expand), as this # would trigger re-sampling the base samples in the fantasy sampler. # By cloning the sampler here, the right thing will happen if the # the sizes are compatible, if they are not this will result in # samples being drawn using different base samples, but it will at # least avoid changing state of the fantasy sampler. self._cost_sampler = deepcopy(self.fantasies_sampler) return self._cost_sampler
[docs] @t_batch_mode_transform(expected_q=1) def forward(self, X: Tensor) -> Tensor: r"""Evaluates `qMultifidelityMaxValueEntropy` at the design points `X` Args: X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches with `1` `d`-dim design point each. Returns: A `batch_shape`-dim Tensor of MF-MVES values at the design points `X`. """ X_expand = self.expand(X) # batch_shape x (1 + num_trace_observations) x d X_max_fidelity = self.project(X) # batch_shape x 1 x d X_all = torch.cat((X_expand, X_max_fidelity), dim=-2).unsqueeze(-3) # batch_shape x num_fantasies x (2 + num_trace_observations) x d # Compute the posterior, posterior mean, variance without noise # `_m` and `_M` in the var names means the current and the max fidelity. posterior = self.model.posterior(X_all, observation_noise=False) mean_M = self.weight * posterior.mean[..., -1, 0] # batch_shape x num_fantasies variance_M = posterior.variance[..., -1, 0].clamp_min(CLAMP_LB) # get the covariance between the low fidelities and max fidelity covar_mM = posterior.mvn.covariance_matrix[..., :-1, -1] # batch_shape x num_fantasies x (1 + num_trace_observations) check_no_nans(mean_M) check_no_nans(variance_M) check_no_nans(covar_mM) # compute the information gain (IG) ig = self._compute_information_gain( X=X_expand, mean_M=mean_M, variance_M=variance_M, covar_mM=covar_mM ) ig = self.cost_aware_utility(X=X, deltas=ig, sampler=self.cost_sampler) return ig.mean(dim=0) # average over the fantasies
def _sample_max_value_Thompson( model: Model, candidate_set: Tensor, num_samples: int, maximize: bool = True ) -> Tensor: """Samples the max values by discrete Thompson sampling. Should generally be called within a `with torch.no_grad()` context. Args: model: A fitted single-outcome model. candidate_set: A `n x d` Tensor including `n` candidate points to discretize the design space. num_samples: Number of max value samples. maximize: If True, consider the problem a maximization problem. Returns: A `num_samples x num_fantasies` Tensor of max value samples """ posterior = model.posterior(candidate_set) weight = 1.0 if maximize else -1.0 samples = weight * posterior.rsample(torch.Size([num_samples])).squeeze(-1) # samples is num_samples x (num_fantasies) x n max_values, _ = samples.max(dim=-1) if len(samples.shape) == 2: max_values = max_values.unsqueeze(-1) # num_samples x num_fantasies return max_values def _sample_max_value_Gumbel( model: Model, candidate_set: Tensor, num_samples: int, maximize: bool = True ) -> Tensor: """Samples the max values by Gumbel approximation. Should generally be called within a `with torch.no_grad()` context. Args: model: A fitted single-outcome model. candidate_set: A `n x d` Tensor including `n` candidate points to discretize the design space. num_samples: Number of max value samples. maximize: If True, consider the problem a maximization problem. Returns: A `num_samples x num_fantasies` Tensor of max value samples """ # define the approximate CDF for the max value under the independence assumption posterior = model.posterior(candidate_set) weight = 1.0 if maximize else -1.0 mu = weight * posterior.mean sigma = posterior.variance.clamp_min(1e-8).sqrt() # mu, sigma is (num_fantasies) X n X 1 if len(mu.shape) == 3 and mu.shape[-1] == 1: mu = mu.squeeze(-1).T sigma = sigma.squeeze(-1).T # mu, sigma is now n X num_fantasies or n X 1 # bisect search to find the quantiles 25, 50, 75 lo = (mu - 3 * sigma).min(dim=0).values hi = (mu + 5 * sigma).max(dim=0).values num_fantasies = mu.shape[1] device = candidate_set.device dtype = candidate_set.dtype quantiles = torch.zeros(num_fantasies, 3, device=device, dtype=dtype) for i in range(num_fantasies): lo_, hi_ = lo[i], hi[i] normal = torch.distributions.normal.Normal(mu[:, i], sigma[:, i]) quantiles[i, :] = torch.tensor( [ brentq(lambda y: normal.cdf(y).log().sum().exp() - p, lo_, hi_) for p in [0.25, 0.50, 0.75] ] ) q25, q50, q75 = quantiles[:, 0], quantiles[:, 1], quantiles[:, 2] # q25, q50, q75 are 1 dimensional tensor with size of either 1 or num_fantasies # parameter fitting based on matching percentiles for the Gumbel distribution b = (q25 - q75) / (log(log(4.0 / 3.0)) - log(log(4.0))) a = q50 + b * log(log(2.0)) # inverse sampling from the fitted Gumbel CDF distribution sample_shape = (num_samples, num_fantasies) eps = torch.rand(*sample_shape, device=device, dtype=dtype) max_values = a - b * eps.log().mul(-1.0).log() return max_values # num_samples x num_fantasies