#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Acquisition functions for max-value entropy search (MES) and
multi-fidelity MES with noisy observation and trace observations.
References
.. [Wang2018mves]
Wang, Z., Jegelka, S.,
Max-value Entropy Search for Efficient Bayesian Optimization.
arXiv:1703.01968v3, 2018
.. [Takeno2019mfmves]
Takeno, S., et al.,
Multi-fidelity Bayesian Optimization with Max-value Entropy Search.
arXiv:1901.08275v1, 2019
"""
from __future__ import annotations
from copy import deepcopy
from math import log
from typing import Callable, Optional
import torch
from botorch.acquisition.cost_aware import CostAwareUtility, InverseCostWeightedUtility
from botorch.acquisition.monte_carlo import MCAcquisitionFunction
from botorch.models.cost import AffineFidelityCostModel
from botorch.models.model import Model
from botorch.models.utils import check_no_nans
from botorch.sampling.samplers import SobolQMCNormalSampler
from botorch.utils.transforms import match_batch_shape, t_batch_mode_transform
from scipy.optimize import brentq
from torch import Tensor
CLAMP_LB = 1.0e-8
[docs]class qMaxValueEntropy(MCAcquisitionFunction):
r"""The acquisition function for Max-value Entropy Search.
This acquisition function computes the mutual information of
max values and a candidate point X. See [Wang2018mves]_ for
a detailed discussion.
The model must be single-outcome.
q > 1 is supported through cyclic optimization and fantasies.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> candidate_set = torch.rand(1000, bounds.size(1))
>>> candidate_set = bounds[0] + (bounds[1] - bounds[0]) * candidate_set
>>> MES = qMaxValueEntropy(model, candidate_set)
>>> mes = MES(test_X)
"""
def __init__(
self,
model: Model,
candidate_set: Tensor,
num_fantasies: int = 16,
num_mv_samples: int = 10,
num_y_samples: int = 128,
use_gumbel: bool = True,
maximize: bool = True,
X_pending: Optional[Tensor] = None,
) -> None:
r"""Single-outcome max-value entropy search acquisition function.
Args:
model: A fitted single-outcome model.
candidate_set: A `n x d` Tensor including `n` candidate points to
discretize the design space. Max values are sampled from the
(joint) model posterior over these points.
num_fantasies: Number of fantasies to generate. The higher this
number the more accurate the model (at the expense of model
complexity, wall time and memory). Ignored if `X_pending` is `None`.
num_mv_samples: Number of max value samples.
num_y_samples: Number of posterior samples at specific design point `X`.
use_gumbel: If True, use Gumbel approximation to sample the max values.
X_pending: A `m x d`-dim Tensor of `m` design points that have been
submitted for function evaluation but have not yet been evaluated.
maximize: If True, consider the problem a maximization problem.
"""
sampler = SobolQMCNormalSampler(num_y_samples)
super().__init__(model=model, sampler=sampler)
# Batch GP models (e.g. fantasized models) are not currently supported
if self.model.train_inputs[0].ndim > 2:
raise NotImplementedError(
"Batch GP models (e.g. fantasized models) "
"are not yet supported by qMaxValueEntropy"
)
self._init_model = model # only used for the `fantasize()` in `set_X_pending()`
train_inputs = match_batch_shape(model.train_inputs[0], candidate_set)
self.candidate_set = torch.cat([candidate_set, train_inputs], dim=0)
self.fantasies_sampler = SobolQMCNormalSampler(num_fantasies)
self.num_fantasies = num_fantasies
self.use_gumbel = use_gumbel
self.num_mv_samples = num_mv_samples
self.maximize = maximize
self.weight = 1.0 if maximize else -1.0
# If we put the `self._sample_max_values()` to `set_X_pending()`,
# it will throw errors when the initial `super().__init__()` is called,
# since some members required by `_sample_max_values()` are not yet initialized.
if X_pending is None:
self._sample_max_values()
else:
self.set_X_pending(X_pending)
[docs] def set_X_pending(self, X_pending: Optional[Tensor] = None) -> None:
r"""Set pending points.
Informs the acquisition function about pending design points,
fantasizes the model on the pending points and draws max-value samples
from the fantasized model posterior.
Args:
X_pending: `m x d` Tensor with `m` `d`-dim design points that have
been submitted for evaluation but have not yet been evaluated.
"""
super().set_X_pending(X_pending=X_pending)
if X_pending is not None:
# fantasize the model
fantasy_model = self._init_model.fantasize(
X=X_pending, sampler=self.fantasies_sampler, observation_noise=True
)
self.model = fantasy_model
self._sample_max_values()
else:
# This is mainly for setting the model to the original model
# after the sequential optimization at q > 1
try:
self.model = self._init_model
self._sample_max_values()
except AttributeError:
pass
def _sample_max_values(self):
r"""Sample max values for MC approximation of the expectation in MES"""
with torch.no_grad():
# Append X_pending to candidate set
if self.X_pending is None:
X_pending = torch.tensor(
[], dtype=self.candidate_set.dtype, device=self.candidate_set.device
)
else:
X_pending = self.X_pending
X_pending = match_batch_shape(X_pending, self.candidate_set)
candidate_set = torch.cat([self.candidate_set, X_pending], dim=0)
# project the candidate_set to the highest fidelity,
# which is needed for the multi-fidelity MES
try:
candidate_set = self.project(candidate_set)
except AttributeError:
pass
# sample max values
if self.use_gumbel:
self.posterior_max_values = _sample_max_value_Gumbel(
self.model, candidate_set, self.num_mv_samples, self.maximize
)
else:
self.posterior_max_values = _sample_max_value_Thompson(
self.model, candidate_set, self.num_mv_samples, self.maximize
)
[docs] @t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Compute max-value entropy at the design points `X`.
Args:
X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches
with `1` `d`-dim design points each.
Returns:
A `batch_shape`-dim Tensor of MVE values at the given design points `X`.
"""
# Compute the posterior, posterior mean, variance and std
posterior = self.model.posterior(X.unsqueeze(-3), observation_noise=False)
mean = self.weight * posterior.mean.squeeze(-1).squeeze(-1)
# batch_shape x num_fantasies
variance = posterior.variance.clamp_min(CLAMP_LB).view_as(mean)
check_no_nans(mean)
check_no_nans(variance)
ig = self._compute_information_gain(
X=X, mean_M=mean, variance_M=variance, covar_mM=variance.unsqueeze(-1)
)
return ig.mean(dim=0) # average over the fantasies
def _compute_information_gain(
self, X: Tensor, mean_M: Tensor, variance_M: Tensor, covar_mM: Tensor
) -> Tensor:
r"""Computes the information gain at the design points `X`.
Approximately computes the information gain at the design points `X`,
for both MES with noisy observations and multi-fidelity MES with noisy
observation and trace observations.
The implementation is inspired from the paper on multi-fidelity MES by
Takeno et. al. [Takeno2019mfmves]_. The notations in the comments in this
function follows the Appendix A in the paper.
Args:
X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches
with `1` `d`-dim design point each.
mean_M, variance_M: `batch_shape x num_fantasies`-dim Tensors of
`batch_shape` t-batches with `num_fantasies` fantasies.
`num_fantasies = 1` for non-fantasized models.
All are obtained without noise.
covar_mM: `batch_shape x num_fantasies x (1 + num_trace_observations)`
-dim Tensor. `num_fantasies = 1` for non-fantasized models.
All are obtained without noise.
Returns:
A `num_fantasies x batch_shape`-dim Tensor of information gains at the
given design points `X`.
"""
# compute the std_m, variance_m with noisy observation
posterior_m = self.model.posterior(X.unsqueeze(-3), observation_noise=True)
mean_m = self.weight * posterior_m.mean.squeeze(-1)
# batch_shape x num_fantasies x (1 + num_trace_observations)
variance_m = posterior_m.mvn.covariance_matrix
# batch_shape x num_fantasies x (1 + num_trace_observations)^2
check_no_nans(variance_m)
# compute mean and std for fM|ym, x, Dt ~ N(u, s^2)
samples_m = self.weight * self.sampler(posterior_m).squeeze(-1)
# s_m x batch_shape x num_fantasies x (1 + num_trace_observations)
L = torch.cholesky(variance_m)
temp_term = torch.cholesky_solve(covar_mM.unsqueeze(-1), L).transpose(-2, -1)
# equivalent to torch.matmul(covar_mM.unsqueeze(-2), torch.inverse(variance_m))
# batch_shape x num_fantasies x 1 x (1 + num_trace_observations)
mean_pt1 = torch.matmul(temp_term, (samples_m - mean_m).unsqueeze(-1))
mean_new = mean_pt1.squeeze(-1).squeeze(-1) + mean_M
# s_m x batch_shape x num_fantasies
variance_pt1 = torch.matmul(temp_term, covar_mM.unsqueeze(-1))
variance_new = variance_M - variance_pt1.squeeze(-1).squeeze(-1)
# batch_shape x num_fantasies
stdv_new = variance_new.clamp_min(CLAMP_LB).sqrt()
# batch_shape x num_fantasies
# define normal distribution to compute cdf and pdf
normal = torch.distributions.Normal(
torch.zeros(1, device=X.device, dtype=X.dtype),
torch.ones(1, device=X.device, dtype=X.dtype),
)
# Compute p(fM <= f* | ym, x, Dt)
view_shape = (
[self.num_mv_samples] + [1] * (len(X.shape) - 2) + [self.num_fantasies]
) # s_M x batch_shape x num_fantasies
if self.X_pending is None:
view_shape[-1] = 1
max_vals = self.posterior_max_values.view(view_shape).unsqueeze(1)
# s_M x 1 x batch_shape x num_fantasies
normalized_mvs_new = (max_vals - mean_new) / stdv_new
# s_M x s_m x batch_shape x num_fantasies =
# s_M x 1 x batch_shape x num_fantasies - s_m x batch_shape x num_fantasies
cdf_mvs_new = normal.cdf(normalized_mvs_new).clamp_min(CLAMP_LB)
# Compute p(fM <= f* | x, Dt)
stdv_M = variance_M.sqrt()
normalized_mvs = (max_vals - mean_M) / stdv_M
# s_M x 1 x batch_shape x num_fantasies =
# s_M x 1 x 1 x num_fantasies - batch_shape x num_fantasies
cdf_mvs = normal.cdf(normalized_mvs).clamp_min(CLAMP_LB)
# s_M x 1 x batch_shape x num_fantasies
# Compute log(p(ym | x, Dt))
log_pdf_fm = posterior_m.mvn.log_prob(self.weight * samples_m).unsqueeze(0)
# 1 x s_m x batch_shape x num_fantasies
# H0 = H(ym | x, Dt)
H0 = posterior_m.mvn.entropy() # batch_shape x num_fantasies
# regression adjusted H1 estimation, H1_hat = H1_bar - beta * (H0_bar - H0)
# H1 = E_{f*|x, Dt}[H(ym|f*, x, Dt)]
Z = cdf_mvs_new / cdf_mvs # s_M x s_m x batch_shape x num_fantasies
h1 = -Z * Z.log() - Z * log_pdf_fm # s_M x s_m x batch_shape x num_fantasies
check_no_nans(h1)
dim = [0, 1] # dimension of fm samples, fM samples
H1_bar = h1.mean(dim=dim)
h0 = -log_pdf_fm
H0_bar = h0.mean(dim=dim)
cov = ((h1 - H1_bar) * (h0 - H0_bar)).mean(dim=dim)
beta = cov / (h0.var(dim=dim) * h1.var(dim=dim)).sqrt()
H1_hat = H1_bar - beta * (H0_bar - H0)
ig = H0 - H1_hat # batch_shape x num_fantasies
ig = ig.permute(-1, *range(ig.dim() - 1)) # num_fantasies x batch_shape
return ig
[docs]class qMultiFidelityMaxValueEntropy(qMaxValueEntropy):
r"""Multi-fidelity max-value entropy.
The acquisition function for multi-fidelity max-value entropy search
with support for trace observations. See [Takeno2019mfmves]_ for a
detailed discussion of the basic ideas on multi-fidelity MES
(note that this implementation is somewhat different).
The model must be single-outcome.
q > 1 is supported through cyclic optimization and fantasies.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> candidate_set = torch.rand(1000, bounds.size(1))
>>> candidate_set = bounds[0] + (bounds[1] - bounds[0]) * candidate_set
>>> MF_MES = qMultiFidelityMaxValueEntropy(model, candidate_set)
>>> mf_mes = MF_MES(test_X)
"""
def __init__(
self,
model: Model,
candidate_set: Tensor,
num_fantasies: int = 16,
num_mv_samples: int = 10,
num_y_samples: int = 128,
use_gumbel: bool = True,
X_pending: Optional[Tensor] = None,
maximize: bool = True,
cost_aware_utility: Optional[CostAwareUtility] = None,
project: Callable[[Tensor], Tensor] = lambda X: X,
expand: Callable[[Tensor], Tensor] = lambda X: X,
) -> None:
r"""Single-outcome max-value entropy search acquisition function.
Args:
model: A fitted single-outcome model.
candidate_set: A `n x d` Tensor including `n` candidate points to
discretize the design space, which will be used to sample the
max values from their posteriors.
cost_aware_utility: A CostAwareUtility computing the cost-transformed
utility from a candidate set and samples of increases in utility.
num_fantasies: Number of fantasies to generate. The higher this
number the more accurate the model (at the expense of model
complexity and performance) and it's only used when `X_pending`
is not `None`.
num_mv_samples: Number of max value samples.
num_y_samples: Number of posterior samples at specific design point `X`.
use_gumbel: If True, use Gumbel approximation to sample the max values.
X_pending: A `m x d`-dim Tensor of `m` design points that have been
submitted for function evaluation but have not yet been evaluated.
maximize: If True, consider the problem a maximization problem.
cost_aware_utility: A CostAwareUtility computing the cost-transformed
utility from a candidate set and samples of increases in utility.
project: A callable mapping a `batch_shape x q x d` tensor of design
points to a tensor of the same shape projected to the desired
target set (e.g. the target fidelities in case of multi-fidelity
optimization).
expand: A callable mapping a `batch_shape x q x d` input tensor to
a `batch_shape x (q + q_e)' x d`-dim output tensor, where the
`q_e` additional points in each q-batch correspond to
additional ("trace") observations.
"""
super().__init__(
model=model,
candidate_set=candidate_set,
num_fantasies=num_fantasies,
num_mv_samples=num_mv_samples,
num_y_samples=num_y_samples,
X_pending=X_pending,
use_gumbel=use_gumbel,
maximize=maximize,
)
if cost_aware_utility is None:
cost_model = AffineFidelityCostModel(fidelity_weights={-1: 1.0})
cost_aware_utility = InverseCostWeightedUtility(cost_model=cost_model)
self.cost_aware_utility = cost_aware_utility
self.expand = expand
self.project = project
self._cost_sampler = None
# @TODO make sure fidelity_dims align in project, expand & cost_aware_utility
# It seems very difficult due to the current way of handling project/expand
# resample max values after initializing self.project
# so that the max value samples are at the highest fidelity
self._sample_max_values()
@property
def cost_sampler(self):
if self._cost_sampler is None:
# Note: Using the deepcopy here is essential. Removing this poses a
# problem if the base model and the cost model have a different number
# of outputs or test points (this would be caused by expand), as this
# would trigger re-sampling the base samples in the fantasy sampler.
# By cloning the sampler here, the right thing will happen if the
# the sizes are compatible, if they are not this will result in
# samples being drawn using different base samples, but it will at
# least avoid changing state of the fantasy sampler.
self._cost_sampler = deepcopy(self.fantasies_sampler)
return self._cost_sampler
[docs] @t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluates `qMultifidelityMaxValueEntropy` at the design points `X`
Args:
X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches
with `1` `d`-dim design point each.
Returns:
A `batch_shape`-dim Tensor of MF-MVES values at the design points `X`.
"""
X_expand = self.expand(X) # batch_shape x (1 + num_trace_observations) x d
X_max_fidelity = self.project(X) # batch_shape x 1 x d
X_all = torch.cat((X_expand, X_max_fidelity), dim=-2).unsqueeze(-3)
# batch_shape x num_fantasies x (2 + num_trace_observations) x d
# Compute the posterior, posterior mean, variance without noise
# `_m` and `_M` in the var names means the current and the max fidelity.
posterior = self.model.posterior(X_all, observation_noise=False)
mean_M = self.weight * posterior.mean[..., -1, 0] # batch_shape x num_fantasies
variance_M = posterior.variance[..., -1, 0].clamp_min(CLAMP_LB)
# get the covariance between the low fidelities and max fidelity
covar_mM = posterior.mvn.covariance_matrix[..., :-1, -1]
# batch_shape x num_fantasies x (1 + num_trace_observations)
check_no_nans(mean_M)
check_no_nans(variance_M)
check_no_nans(covar_mM)
# compute the information gain (IG)
ig = self._compute_information_gain(
X=X_expand, mean_M=mean_M, variance_M=variance_M, covar_mM=covar_mM
)
ig = self.cost_aware_utility(X=X, deltas=ig, sampler=self.cost_sampler)
return ig.mean(dim=0) # average over the fantasies
def _sample_max_value_Thompson(
model: Model, candidate_set: Tensor, num_samples: int, maximize: bool = True
) -> Tensor:
"""Samples the max values by discrete Thompson sampling.
Should generally be called within a `with torch.no_grad()` context.
Args:
model: A fitted single-outcome model.
candidate_set: A `n x d` Tensor including `n` candidate points to
discretize the design space.
num_samples: Number of max value samples.
maximize: If True, consider the problem a maximization problem.
Returns:
A `num_samples x num_fantasies` Tensor of max value samples
"""
posterior = model.posterior(candidate_set)
weight = 1.0 if maximize else -1.0
samples = weight * posterior.rsample(torch.Size([num_samples])).squeeze(-1)
# samples is num_samples x (num_fantasies) x n
max_values, _ = samples.max(dim=-1)
if len(samples.shape) == 2:
max_values = max_values.unsqueeze(-1) # num_samples x num_fantasies
return max_values
def _sample_max_value_Gumbel(
model: Model, candidate_set: Tensor, num_samples: int, maximize: bool = True
) -> Tensor:
"""Samples the max values by Gumbel approximation.
Should generally be called within a `with torch.no_grad()` context.
Args:
model: A fitted single-outcome model.
candidate_set: A `n x d` Tensor including `n` candidate points to
discretize the design space.
num_samples: Number of max value samples.
maximize: If True, consider the problem a maximization problem.
Returns:
A `num_samples x num_fantasies` Tensor of max value samples
"""
# define the approximate CDF for the max value under the independence assumption
posterior = model.posterior(candidate_set)
weight = 1.0 if maximize else -1.0
mu = weight * posterior.mean
sigma = posterior.variance.clamp_min(1e-8).sqrt()
# mu, sigma is (num_fantasies) X n X 1
if len(mu.shape) == 3 and mu.shape[-1] == 1:
mu = mu.squeeze(-1).T
sigma = sigma.squeeze(-1).T
# mu, sigma is now n X num_fantasies or n X 1
# bisect search to find the quantiles 25, 50, 75
lo = (mu - 3 * sigma).min(dim=0).values
hi = (mu + 5 * sigma).max(dim=0).values
num_fantasies = mu.shape[1]
device = candidate_set.device
dtype = candidate_set.dtype
quantiles = torch.zeros(num_fantasies, 3, device=device, dtype=dtype)
for i in range(num_fantasies):
lo_, hi_ = lo[i], hi[i]
normal = torch.distributions.normal.Normal(mu[:, i], sigma[:, i])
quantiles[i, :] = torch.tensor(
[
brentq(lambda y: normal.cdf(y).log().sum().exp() - p, lo_, hi_)
for p in [0.25, 0.50, 0.75]
]
)
q25, q50, q75 = quantiles[:, 0], quantiles[:, 1], quantiles[:, 2]
# q25, q50, q75 are 1 dimensional tensor with size of either 1 or num_fantasies
# parameter fitting based on matching percentiles for the Gumbel distribution
b = (q25 - q75) / (log(log(4.0 / 3.0)) - log(log(4.0)))
a = q50 + b * log(log(2.0))
# inverse sampling from the fitted Gumbel CDF distribution
sample_shape = (num_samples, num_fantasies)
eps = torch.rand(*sample_shape, device=device, dtype=dtype)
max_values = a - b * eps.log().mul(-1.0).log()
return max_values # num_samples x num_fantasies