#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Acquisition functions for max-value entropy search (MES) and
multi-fidelity MES with noisy observation and trace observations.
References
.. [Wang2018mves]
Wang, Z., Jegelka, S.,
Max-value Entropy Search for Efficient Bayesian Optimization.
arXiv:1703.01968v3, 2018
.. [Takeno2019mfmves]
Takeno, S., et al.,
Multi-fidelity Bayesian Optimization with Max-value Entropy Search.
arXiv:1901.08275v1, 2019
"""
from __future__ import annotations
from copy import deepcopy
from math import log
from typing import Callable, Optional
import torch
from scipy.optimize import brentq
from torch import Tensor
from ..models.cost import AffineFidelityCostModel
from ..models.model import Model
from ..models.utils import check_no_nans
from ..sampling.samplers import SobolQMCNormalSampler
from ..utils.transforms import match_batch_shape, t_batch_mode_transform
from .cost_aware import CostAwareUtility, InverseCostWeightedUtility
from .monte_carlo import MCAcquisitionFunction
CLAMP_LB = 1.0e-8
[docs]class qMaxValueEntropy(MCAcquisitionFunction):
r"""The acquisition function for Max-value Entropy Search.
This acquisition function computes the mutual information of
max values and a candidate point X. See [Wang2018mves]_ for
a detailed discussion.
The model must be single-outcome.
q > 1 is supported through cyclic optimization and fantasies.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> candidate_set = torch.rand(1000, bounds.size(1))
>>> candidate_set = bounds[0] + (bounds[1] - bounds[0]) * candidate_set
>>> MES = qMaxValueEntropy(model, candidate_set)
>>> mes = MES(test_X)
"""
def __init__(
self,
model: Model,
candidate_set: Tensor,
num_fantasies: int = 16,
num_mv_samples: int = 10,
num_y_samples: int = 128,
use_gumbel: bool = True,
maximize: bool = True,
X_pending: Optional[Tensor] = None,
) -> None:
r"""Single-outcome max-value entropy search acquisition function.
Args:
model: A fitted single-outcome model.
candidate_set: A `n x d` Tensor including `n` candidate points to
discretize the design space. Max values are sampled from the
(joint) model posterior over these points.
num_fantasies: Number of fantasies to generate. The higher this
number the more accurate the model (at the expense of model
complexity, wall time and memory). Ignored if `X_pending` is `None`.
num_mv_samples: Number of max value samples.
num_y_samples: Number of posterior samples at specific design point `X`.
use_gumbel: If True, use Gumbel approximation to sample the max values.
X_pending: A `m x d`-dim Tensor of `m` design points that have been
submitted for function evaluation but have not yet been evaluated.
maximize: If True, consider the problem a maximization problem.
"""
sampler = SobolQMCNormalSampler(num_y_samples)
super().__init__(model=model, sampler=sampler)
# Batch GP models (e.g. fantasized models) are not currently supported
if self.model.train_inputs[0].ndim > 2:
raise NotImplementedError(
"Batch GP models (e.g. fantasized models) "
"are not yet supported by qMaxValueEntropy"
)
self._init_model = model # only used for the `fantasize()` in `set_X_pending()`
train_inputs = match_batch_shape(model.train_inputs[0], candidate_set)
self.candidate_set = torch.cat([candidate_set, train_inputs], dim=0)
self.fantasies_sampler = SobolQMCNormalSampler(num_fantasies)
self.num_fantasies = num_fantasies
self.use_gumbel = use_gumbel
self.num_mv_samples = num_mv_samples
self.maximize = maximize
self.weight = 1.0 if maximize else -1.0
# If we put the `self._sample_max_values()` to `set_X_pending()`,
# it will throw errors when the initial `super().__init__()` is called,
# since some members required by `_sample_max_values()` are not yet initialized.
if X_pending is None:
self._sample_max_values()
else:
self.set_X_pending(X_pending)
[docs] def set_X_pending(self, X_pending: Optional[Tensor] = None) -> None:
r"""Set pending points.
Informs the acquisition function about pending design points,
fantasizes the model on the pending points and draws max-value samples
from the fantasized model posterior.
Args:
X_pending: `m x d` Tensor with `m` `d`-dim design points that have
been submitted for evaluation but have not yet been evaluated.
"""
super().set_X_pending(X_pending=X_pending)
if X_pending is not None:
# fantasize the model
fantasy_model = self._init_model.fantasize(
X=X_pending, sampler=self.fantasies_sampler, observation_noise=True
)
self.model = fantasy_model
self._sample_max_values()
else:
# This is mainly for setting the model to the original model
# after the sequential optimization at q > 1
try:
self.model = self._init_model
self._sample_max_values()
except AttributeError:
pass
def _sample_max_values(self):
r"""Sample max values for MC approximation of the expectation in MES"""
with torch.no_grad():
# Append X_pending to candidate set
if self.X_pending is None:
X_pending = torch.tensor(
[], dtype=self.candidate_set.dtype, device=self.candidate_set.device
)
else:
X_pending = self.X_pending
X_pending = match_batch_shape(X_pending, self.candidate_set)
candidate_set = torch.cat([self.candidate_set, X_pending], dim=0)
# project the candidate_set to the highest fidelity,
# which is needed for the multi-fidelity MES
try:
candidate_set = self.project(candidate_set)
except AttributeError:
pass
# sample max values
if self.use_gumbel:
self.posterior_max_values = _sample_max_value_Gumbel(
self.model, candidate_set, self.num_mv_samples, self.maximize
)
else:
self.posterior_max_values = _sample_max_value_Thompson(
self.model, candidate_set, self.num_mv_samples, self.maximize
)
[docs] @t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Compute max-value entropy at the design points `X`.
Args:
X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches
with `1` `d`-dim design points each.
Returns:
A `batch_shape`-dim Tensor of MVE values at the given design points `X`.
"""
# Compute the posterior, posterior mean, variance and std
posterior = self.model.posterior(X.unsqueeze(-3), observation_noise=False)
mean = self.weight * posterior.mean.squeeze(-1).squeeze(-1)
# batch_shape x num_fantasies
variance = posterior.variance.clamp_min(CLAMP_LB).view_as(mean)
check_no_nans(mean)
check_no_nans(variance)
ig = self._compute_information_gain(
X=X, mean_M=mean, variance_M=variance, covar_mM=variance.unsqueeze(-1)
)
return ig.mean(dim=0) # average over the fantasies
def _compute_information_gain(
self, X: Tensor, mean_M: Tensor, variance_M: Tensor, covar_mM: Tensor
) -> Tensor:
r"""Computes the information gain at the design points `X`.
Approximately computes the information gain at the design points `X`,
for both MES with noisy observations and multi-fidelity MES with noisy
observation and trace observations.
The implementation is inspired from the paper on multi-fidelity MES by
Takeno et. al. [Takeno2019mfmves]_. The notations in the comments in this
function follows the Appendix A in the paper.
Args:
X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches
with `1` `d`-dim design point each.
mean_M, variance_M: `batch_shape x num_fantasies`-dim Tensors of
`batch_shape` t-batches with `num_fantasies` fantasies.
`num_fantasies = 1` for non-fantasized models.
All are obtained without noise.
covar_mM: `batch_shape x num_fantasies x (1 + num_trace_observations)`
-dim Tensor. `num_fantasies = 1` for non-fantasized models.
All are obtained without noise.
Returns:
A `num_fantasies x batch_shape`-dim Tensor of information gains at the
given design points `X`.
"""
# compute the std_m, variance_m with noisy observation
posterior_m = self.model.posterior(X.unsqueeze(-3), observation_noise=True)
mean_m = self.weight * posterior_m.mean.squeeze(-1)
# batch_shape x num_fantasies x (1 + num_trace_observations)
variance_m = posterior_m.mvn.covariance_matrix
# batch_shape x num_fantasies x (1 + num_trace_observations)^2
check_no_nans(variance_m)
# compute mean and std for fM|ym, x, Dt ~ N(u, s^2)
samples_m = self.weight * self.sampler(posterior_m).squeeze(-1)
# s_m x batch_shape x num_fantasies x (1 + num_trace_observations)
L = torch.cholesky(variance_m)
temp_term = torch.cholesky_solve(covar_mM.unsqueeze(-1), L).transpose(-2, -1)
# equivalent to torch.matmul(covar_mM.unsqueeze(-2), torch.inverse(variance_m))
# batch_shape x num_fantasies x 1 x (1 + num_trace_observations)
mean_pt1 = torch.matmul(temp_term, (samples_m - mean_m).unsqueeze(-1))
mean_new = mean_pt1.squeeze(-1).squeeze(-1) + mean_M
# s_m x batch_shape x num_fantasies
variance_pt1 = torch.matmul(temp_term, covar_mM.unsqueeze(-1))
variance_new = variance_M - variance_pt1.squeeze(-1).squeeze(-1)
# batch_shape x num_fantasies
stdv_new = variance_new.clamp_min(CLAMP_LB).sqrt()
# batch_shape x num_fantasies
# define normal distribution to compute cdf and pdf
normal = torch.distributions.Normal(
torch.zeros(1, device=X.device, dtype=X.dtype),
torch.ones(1, device=X.device, dtype=X.dtype),
)
# Compute p(fM <= f* | ym, x, Dt)
view_shape = (
[self.num_mv_samples] + [1] * (len(X.shape) - 2) + [self.num_fantasies]
) # s_M x batch_shape x num_fantasies
if self.X_pending is None:
view_shape[-1] = 1
max_vals = self.posterior_max_values.view(view_shape).unsqueeze(1)
# s_M x 1 x batch_shape x num_fantasies
normalized_mvs_new = (max_vals - mean_new) / stdv_new
# s_M x s_m x batch_shape x num_fantasies =
# s_M x 1 x batch_shape x num_fantasies - s_m x batch_shape x num_fantasies
cdf_mvs_new = normal.cdf(normalized_mvs_new).clamp_min(CLAMP_LB)
# Compute p(fM <= f* | x, Dt)
stdv_M = variance_M.sqrt()
normalized_mvs = (max_vals - mean_M) / stdv_M
# s_M x 1 x batch_shape x num_fantasies =
# s_M x 1 x 1 x num_fantasies - batch_shape x num_fantasies
cdf_mvs = normal.cdf(normalized_mvs).clamp_min(CLAMP_LB)
# s_M x 1 x batch_shape x num_fantasies
# Compute log(p(ym | x, Dt))
log_pdf_fm = posterior_m.mvn.log_prob(self.weight * samples_m).unsqueeze(0)
# 1 x s_m x batch_shape x num_fantasies
# H0 = H(ym | x, Dt)
H0 = posterior_m.mvn.entropy() # batch_shape x num_fantasies
# regression adjusted H1 estimation, H1_hat = H1_bar - beta * (H0_bar - H0)
# H1 = E_{f*|x, Dt}[H(ym|f*, x, Dt)]
Z = cdf_mvs_new / cdf_mvs # s_M x s_m x batch_shape x num_fantasies
h1 = -Z * Z.log() - Z * log_pdf_fm # s_M x s_m x batch_shape x num_fantasies
check_no_nans(h1)
dim = [0, 1] # dimension of fm samples, fM samples
H1_bar = h1.mean(dim=dim)
h0 = -log_pdf_fm
H0_bar = h0.mean(dim=dim)
cov = ((h1 - H1_bar) * (h0 - H0_bar)).mean(dim=dim)
beta = cov / (h0.var(dim=dim) * h1.var(dim=dim)).sqrt()
H1_hat = H1_bar - beta * (H0_bar - H0)
ig = H0 - H1_hat # batch_shape x num_fantasies
ig = ig.permute(-1, *range(ig.dim() - 1)) # num_fantasies x batch_shape
return ig
[docs]class qMultiFidelityMaxValueEntropy(qMaxValueEntropy):
r"""Multi-fidelity max-value entropy.
The acquisition function for multi-fidelity max-value entropy search
with support for trace observations. See [Takeno2019mfmves]_ for a
detailed discussion of the basic ideas on multi-fidelity MES
(note that this implementation is somewhat different).
The model must be single-outcome.
q > 1 is supported through cyclic optimization and fantasies.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> candidate_set = torch.rand(1000, bounds.size(1))
>>> candidate_set = bounds[0] + (bounds[1] - bounds[0]) * candidate_set
>>> MF_MES = qMultiFidelityMaxValueEntropy(model, candidate_set)
>>> mf_mes = MF_MES(test_X)
"""
def __init__(
self,
model: Model,
candidate_set: Tensor,
num_fantasies: int = 16,
num_mv_samples: int = 10,
num_y_samples: int = 128,
use_gumbel: bool = True,
X_pending: Optional[Tensor] = None,
maximize: bool = True,
cost_aware_utility: Optional[CostAwareUtility] = None,
project: Callable[[Tensor], Tensor] = lambda X: X,
expand: Callable[[Tensor], Tensor] = lambda X: X,
) -> None:
r"""Single-outcome max-value entropy search acquisition function.
Args:
model: A fitted single-outcome model.
candidate_set: A `n x d` Tensor including `n` candidate points to
discretize the design space, which will be used to sample the
max values from their posteriors.
cost_aware_utility: A CostAwareUtility computing the cost-transformed
utility from a candidate set and samples of increases in utility.
num_fantasies: Number of fantasies to generate. The higher this
number the more accurate the model (at the expense of model
complexity and performance) and it's only used when `X_pending`
is not `None`.
num_mv_samples: Number of max value samples.
num_y_samples: Number of posterior samples at specific design point `X`.
use_gumbel: If True, use Gumbel approximation to sample the max values.
X_pending: A `m x d`-dim Tensor of `m` design points that have been
submitted for function evaluation but have not yet been evaluated.
maximize: If True, consider the problem a maximization problem.
cost_aware_utility: A CostAwareUtility computing the cost-transformed
utility from a candidate set and samples of increases in utility.
project: A callable mapping a `batch_shape x q x d` tensor of design
points to a tensor of the same shape projected to the desired
target set (e.g. the target fidelities in case of multi-fidelity
optimization).
expand: A callable mapping a `batch_shape x q x d` input tensor to
a `batch_shape x (q + q_e)' x d`-dim output tensor, where the
`q_e` additional points in each q-batch correspond to
additional ("trace") observations.
"""
super().__init__(
model=model,
candidate_set=candidate_set,
num_fantasies=num_fantasies,
num_mv_samples=num_mv_samples,
num_y_samples=num_y_samples,
X_pending=X_pending,
use_gumbel=use_gumbel,
maximize=maximize,
)
if cost_aware_utility is None:
cost_model = AffineFidelityCostModel(fidelity_weights={-1: 1.0})
cost_aware_utility = InverseCostWeightedUtility(cost_model=cost_model)
self.cost_aware_utility = cost_aware_utility
self.expand = expand
self.project = project
self._cost_sampler = None
# @TODO make sure fidelity_dims align in project, expand & cost_aware_utility
# It seems very difficult due to the current way of handling project/expand
# resample max values after initializing self.project
# so that the max value samples are at the highest fidelity
self._sample_max_values()
@property
def cost_sampler(self):
if self._cost_sampler is None:
# Note: Using the deepcopy here is essential. Removing this poses a
# problem if the base model and the cost model have a different number
# of outputs or test points (this would be caused by expand), as this
# would trigger re-sampling the base samples in the fantasy sampler.
# By cloning the sampler here, the right thing will happen if the
# the sizes are compatible, if they are not this will result in
# samples being drawn using different base samples, but it will at
# least avoid changing state of the fantasy sampler.
self._cost_sampler = deepcopy(self.fantasies_sampler)
return self._cost_sampler
[docs] @t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluates `qMultifidelityMaxValueEntropy` at the design points `X`
Args:
X: A `batch_shape x 1 x d`-dim Tensor of `batch_shape` t-batches
with `1` `d`-dim design point each.
Returns:
A `batch_shape`-dim Tensor of MF-MVES values at the design points `X`.
"""
X_expand = self.expand(X) # batch_shape x (1 + num_trace_observations) x d
X_max_fidelity = self.project(X) # batch_shape x 1 x d
X_all = torch.cat((X_expand, X_max_fidelity), dim=-2).unsqueeze(-3)
# batch_shape x num_fantasies x (2 + num_trace_observations) x d
# Compute the posterior, posterior mean, variance without noise
# `_m` and `_M` in the var names means the current and the max fidelity.
posterior = self.model.posterior(X_all, observation_noise=False)
mean_M = self.weight * posterior.mean[..., -1, 0] # batch_shape x num_fantasies
variance_M = posterior.variance[..., -1, 0].clamp_min(CLAMP_LB)
# get the covariance between the low fidelities and max fidelity
covar_mM = posterior.mvn.covariance_matrix[..., :-1, -1]
# batch_shape x num_fantasies x (1 + num_trace_observations)
check_no_nans(mean_M)
check_no_nans(variance_M)
check_no_nans(covar_mM)
# compute the information gain (IG)
ig = self._compute_information_gain(
X=X_expand, mean_M=mean_M, variance_M=variance_M, covar_mM=covar_mM
)
ig = self.cost_aware_utility(X=X, deltas=ig, sampler=self.cost_sampler)
return ig.mean(dim=0) # average over the fantasies
def _sample_max_value_Thompson(
model: Model, candidate_set: Tensor, num_samples: int, maximize: bool = True
) -> Tensor:
"""Samples the max values by discrete Thompson sampling.
Should generally be called within a `with torch.no_grad()` context.
Args:
model: A fitted single-outcome model.
candidate_set: A `n x d` Tensor including `n` candidate points to
discretize the design space.
num_samples: Number of max value samples.
maximize: If True, consider the problem a maximization problem.
Returns:
A `num_samples x num_fantasies` Tensor of max value samples
"""
posterior = model.posterior(candidate_set)
weight = 1.0 if maximize else -1.0
samples = weight * posterior.rsample(torch.Size([num_samples])).squeeze(-1)
# samples is num_samples x (num_fantasies) x n
max_values, _ = samples.max(dim=-1)
if len(samples.shape) == 2:
max_values = max_values.unsqueeze(-1) # num_samples x num_fantasies
return max_values
def _sample_max_value_Gumbel(
model: Model, candidate_set: Tensor, num_samples: int, maximize: bool = True
) -> Tensor:
"""Samples the max values by Gumbel approximation.
Should generally be called within a `with torch.no_grad()` context.
Args:
model: A fitted single-outcome model.
candidate_set: A `n x d` Tensor including `n` candidate points to
discretize the design space.
num_samples: Number of max value samples.
maximize: If True, consider the problem a maximization problem.
Returns:
A `num_samples x num_fantasies` Tensor of max value samples
"""
# define the approximate CDF for the max value under the independence assumption
posterior = model.posterior(candidate_set)
weight = 1.0 if maximize else -1.0
mu = weight * posterior.mean
sigma = posterior.variance.clamp_min(1e-8).sqrt()
# mu, sigma is (num_fantasies) X n X 1
if len(mu.shape) == 3 and mu.shape[-1] == 1:
mu = mu.squeeze(-1).T
sigma = sigma.squeeze(-1).T
# mu, sigma is now n X num_fantasies or n X 1
# bisect search to find the quantiles 25, 50, 75
lo = (mu - 3 * sigma).min(dim=0).values
hi = (mu + 5 * sigma).max(dim=0).values
num_fantasies = mu.shape[1]
device = candidate_set.device
dtype = candidate_set.dtype
quantiles = torch.zeros(num_fantasies, 3, device=device, dtype=dtype)
for i in range(num_fantasies):
lo_, hi_ = lo[i], hi[i]
normal = torch.distributions.normal.Normal(mu[:, i], sigma[:, i])
quantiles[i, :] = torch.tensor(
[
brentq(lambda y: normal.cdf(y).log().sum().exp() - p, lo_, hi_)
for p in [0.25, 0.50, 0.75]
]
)
q25, q50, q75 = quantiles[:, 0], quantiles[:, 1], quantiles[:, 2]
# q25, q50, q75 are 1 dimensional tensor with size of either 1 or num_fantasies
# parameter fitting based on matching percentiles for the Gumbel distribution
b = (q25 - q75) / (log(log(4.0 / 3.0)) - log(log(4.0)))
a = q50 + b * log(log(2.0))
# inverse sampling from the fitted Gumbel CDF distribution
sample_shape = (num_samples, num_fantasies)
eps = torch.rand(*sample_shape, device=device, dtype=dtype)
max_values = a - b * eps.log().mul(-1.0).log()
return max_values # num_samples x num_fantasies