#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Batch Knowledge Gradient (KG) via one-shot optimization as introduced in
[Balandat2020botorch]_. For broader discussion of KG see also [Frazier2008knowledge]_
and [Wu2016parallelkg]_.
.. [Balandat2020botorch]
M. Balandat, B. Karrer, D. R. Jiang, S. Daulton, B. Letham, A. G. Wilson, and
E. Bakshy. BoTorch: A Framework for Efficient Monte-Carlo Bayesian Optimization.
Advances in Neural Information Processing Systems 33, 2020.
.. [Frazier2008knowledge]
P. Frazier, W. Powell, and S. Dayanik. A Knowledge-Gradient policy for
sequential information collection. SIAM Journal on Control and Optimization,
2008.
.. [Wu2016parallelkg]
J. Wu and P. Frazier. The parallel knowledge gradient method for batch
bayesian optimization. NIPS 2016.
"""
from __future__ import annotations
from copy import deepcopy
from typing import Any, Callable, Optional, Tuple, Union
import torch
from botorch import settings
from botorch.acquisition.acquisition import (
AcquisitionFunction,
OneShotAcquisitionFunction,
)
from botorch.acquisition.analytic import PosteriorMean
from botorch.acquisition.cost_aware import CostAwareUtility
from botorch.acquisition.monte_carlo import MCAcquisitionFunction, qSimpleRegret
from botorch.acquisition.objective import (
AcquisitionObjective,
MCAcquisitionObjective,
ScalarizedObjective,
)
from botorch.exceptions.errors import UnsupportedError
from botorch.models.model import Model
from botorch.sampling.samplers import MCSampler, SobolQMCNormalSampler
from botorch.utils.transforms import (
concatenate_pending_points,
match_batch_shape,
t_batch_mode_transform,
)
from torch import Tensor
[docs]class qKnowledgeGradient(MCAcquisitionFunction, OneShotAcquisitionFunction):
r"""Batch Knowledge Gradient using one-shot optimization.
This computes the batch Knowledge Gradient using fantasies for the outer
expectation and either the model posterior mean or MC-sampling for the inner
expectation.
In addition to the design variables, the input `X` also includes variables
for the optimal designs for each of the fantasy models. For a fixed number
of fantasies, all parts of `X` can be optimized in a "one-shot" fashion.
"""
def __init__(
self,
model: Model,
num_fantasies: Optional[int] = 64,
sampler: Optional[MCSampler] = None,
objective: Optional[AcquisitionObjective] = None,
inner_sampler: Optional[MCSampler] = None,
X_pending: Optional[Tensor] = None,
current_value: Optional[Tensor] = None,
**kwargs: Any,
) -> None:
r"""q-Knowledge Gradient (one-shot optimization).
Args:
model: A fitted model. Must support fantasizing.
num_fantasies: The number of fantasy points to use. More fantasy
points result in a better approximation, at the expense of
memory and wall time. Unused if `sampler` is specified.
sampler: The sampler used to sample fantasy observations. Optional
if `num_fantasies` is specified.
objective: The objective under which the samples are evaluated. If
`None` or a ScalarizedObjective, then the analytic posterior mean
is used, otherwise the objective is MC-evaluated (using
inner_sampler).
inner_sampler: The sampler used for inner sampling. Ignored if the
objective is `None` or a ScalarizedObjective.
X_pending: A `m x d`-dim Tensor of `m` design points that have
points that have been submitted for function evaluation
but have not yet been evaluated.
current_value: The current value, i.e. the expected best objective
given the observed points `D`. If omitted, forward will not
return the actual KG value, but the expected best objective
given the data set `D u X`.
"""
if sampler is None:
if num_fantasies is None:
raise ValueError(
"Must specify `num_fantasies` if no `sampler` is provided."
)
# base samples should be fixed for joint optimization over X, X_fantasies
sampler = SobolQMCNormalSampler(
num_samples=num_fantasies, resample=False, collapse_batch_dims=True
)
elif num_fantasies is not None:
if sampler.sample_shape != torch.Size([num_fantasies]):
raise ValueError(
f"The sampler shape must match num_fantasies={num_fantasies}."
)
else:
num_fantasies = sampler.sample_shape[0]
super(MCAcquisitionFunction, self).__init__(model=model)
# if not explicitly specified, we use the posterior mean for linear objs
if isinstance(objective, MCAcquisitionObjective) and inner_sampler is None:
inner_sampler = SobolQMCNormalSampler(
num_samples=128, resample=False, collapse_batch_dims=True
)
if objective is None and model.num_outputs != 1:
raise UnsupportedError(
"Must specify an objective when using a multi-output model."
)
self.sampler = sampler
self.objective = objective
self.set_X_pending(X_pending)
self.inner_sampler = inner_sampler
self.num_fantasies = num_fantasies
self.current_value = current_value
[docs] @t_batch_mode_transform()
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate qKnowledgeGradient on the candidate set `X`.
Args:
X: A `b x (q + num_fantasies) x d` Tensor with `b` t-batches of
`q + num_fantasies` design points each. We split this X tensor
into two parts in the `q` dimension (`dim=-2`). The first `q`
are the q-batch of design points and the last num_fantasies are
the current solutions of the inner optimization problem.
`X_fantasies = X[..., -num_fantasies:, :]`
`X_fantasies.shape = b x num_fantasies x d`
`X_actual = X[..., :-num_fantasies, :]`
`X_actual.shape = b x q x d`
Returns:
A Tensor of shape `b`. For t-batch b, the q-KG value of the design
`X_actual[b]` is averaged across the fantasy models, where
`X_fantasies[b, i]` is chosen as the final selection for the
`i`-th fantasy model.
NOTE: If `current_value` is not provided, then this is not the
true KG value of `X_actual[b]`, and `X_fantasies[b, : ]` must be
maximized at fixed `X_actual[b]`.
"""
X_actual, X_fantasies = _split_fantasy_points(X=X, n_f=self.num_fantasies)
# We only concatenate X_pending into the X part after splitting
if self.X_pending is not None:
X_actual = torch.cat(
[X_actual, match_batch_shape(self.X_pending, X_actual)], dim=-2
)
# construct the fantasy model of shape `num_fantasies x b`
fantasy_model = self.model.fantasize(
X=X_actual, sampler=self.sampler, observation_noise=True
)
# get the value function
value_function = _get_value_function(
model=fantasy_model, objective=self.objective, sampler=self.inner_sampler
)
# make sure to propagate gradients to the fantasy model train inputs
with settings.propagate_grads(True):
values = value_function(X=X_fantasies) # num_fantasies x b
if self.current_value is not None:
values = values - self.current_value
# return average over the fantasy samples
return values.mean(dim=0)
[docs] @concatenate_pending_points
@t_batch_mode_transform()
def evaluate(self, X_actual: Tensor, bounds: Tensor, **kwargs: Any) -> Tensor:
r"""Evaluate qKnowledgeGradient on the candidate set `X_actual` by
solving the inner optimization problem.
Args:
X_actual: A `b x q x d` Tensor with `b` t-batches of `q` design points
each. Unlike `forward()`, this does not include solutions of the
inner optimization problem.
bounds: A `2 x d` tensor of lower and upper bounds for each column of
the solutions to the inner problem.
kwargs: Additional keyword arguments. This includes the options for
optimization of the inner problem, i.e. `num_restarts`, `raw_samples`,
an `options` dictionary to be passed on to the optimization helpers, and
a `scipy_options` dictionary to be passed to `scipy.minimize`.
Returns:
A Tensor of shape `b`. For t-batch b, the q-KG value of the design
`X_actual[b]` is averaged across the fantasy models.
NOTE: If `current_value` is not provided, then this is not the
true KG value of `X_actual[b]`.
"""
# construct the fantasy model of shape `num_fantasies x b`
fantasy_model = self.model.fantasize(
X=X_actual, sampler=self.sampler, observation_noise=True
)
# get the value function
value_function = _get_value_function(
model=fantasy_model, objective=self.objective, sampler=self.inner_sampler
)
# optimize the inner problem
from botorch.optim.initializers import gen_value_function_initial_conditions
from botorch.generation.gen import gen_candidates_scipy
initial_conditions = gen_value_function_initial_conditions(
acq_function=value_function,
bounds=bounds,
num_restarts=kwargs.get("num_restarts", 20),
raw_samples=kwargs.get("raw_samples", 1024),
current_model=self.model,
options={**kwargs.get("options", {}), **kwargs.get("scipy_options", {})},
)
_, values = gen_candidates_scipy(
initial_conditions=initial_conditions,
acquisition_function=value_function,
lower_bounds=bounds[0],
upper_bounds=bounds[1],
options=kwargs.get("scipy_options"),
)
# get the maximizer for each batch
values, _ = torch.max(values, dim=0)
if self.current_value is not None:
values = values - self.current_value
# return average over the fantasy samples
return values.mean(dim=0)
[docs] def get_augmented_q_batch_size(self, q: int) -> int:
r"""Get augmented q batch size for one-shot optimization.
Args:
q: The number of candidates to consider jointly.
Returns:
The augmented size for one-shot optimization (including variables
parameterizing the fantasy solutions).
"""
return q + self.num_fantasies
[docs] def extract_candidates(self, X_full: Tensor) -> Tensor:
r"""We only return X as the set of candidates post-optimization.
Args:
X_full: A `b x (q + num_fantasies) x d`-dim Tensor with `b`
t-batches of `q + num_fantasies` design points each.
Returns:
A `b x q x d`-dim Tensor with `b` t-batches of `q` design points each.
"""
return X_full[..., : -self.num_fantasies, :]
[docs]class qMultiFidelityKnowledgeGradient(qKnowledgeGradient):
r"""Batch Knowledge Gradient for multi-fidelity optimization.
A version of `qKnowledgeGradient` that supports multi-fidelity optimization
via a `CostAwareUtility` and the `project` and `expand` operators. If none
of these are set, this acquisition function reduces to `qKnowledgeGradient`.
"""
def __init__(
self,
model: Model,
num_fantasies: Optional[int] = 64,
sampler: Optional[MCSampler] = None,
objective: Optional[AcquisitionObjective] = None,
inner_sampler: Optional[MCSampler] = None,
X_pending: Optional[Tensor] = None,
current_value: Optional[Tensor] = None,
cost_aware_utility: Optional[CostAwareUtility] = None,
project: Callable[[Tensor], Tensor] = lambda X: X,
expand: Callable[[Tensor], Tensor] = lambda X: X,
**kwargs: Any,
) -> None:
r"""Multi-Fidelity q-Knowledge Gradient (one-shot optimization).
Args:
model: A fitted model. Must support fantasizing.
num_fantasies: The number of fantasy points to use. More fantasy
points result in a better approximation, at the expense of
memory and wall time. Unused if `sampler` is specified.
sampler: The sampler used to sample fantasy observations. Optional
if `num_fantasies` is specified.
objective: The objective under which the samples are evaluated. If
`None` or a ScalarizedObjective, then the analytic posterior mean
is used, otherwise the objective is MC-evaluated (using
inner_sampler).
inner_sampler: The sampler used for inner sampling. Ignored if the
objective is `None` or a ScalarizedObjective.
X_pending: A `m x d`-dim Tensor of `m` design points that have
points that have been submitted for function evaluation
but have not yet been evaluated.
current_value: The current value, i.e. the expected best objective
given the observed points `D`. If omitted, forward will not
return the actual KG value, but the expected best objective
given the data set `D u X`.
cost_aware_utility: A CostAwareUtility computing the cost-transformed
utility from a candidate set and samples of increases in utility.
project: A callable mapping a `batch_shape x q x d` tensor of design
points to a tensor of the same shape projected to the desired
target set (e.g. the target fidelities in case of multi-fidelity
optimization).
expand: A callable mapping a `batch_shape x q x d` input tensor to
a `batch_shape x (q + q_e)' x d`-dim output tensor, where the
`q_e` additional points in each q-batch correspond to
additional ("trace") observations.
"""
if current_value is None and cost_aware_utility is not None:
raise UnsupportedError(
"Cost-aware KG requires current_value to be specified."
)
super().__init__(
model=model,
num_fantasies=num_fantasies,
sampler=sampler,
objective=objective,
inner_sampler=inner_sampler,
X_pending=X_pending,
current_value=current_value,
)
self.cost_aware_utility = cost_aware_utility
self.project = project
self.expand = expand
self._cost_sampler = None
@property
def cost_sampler(self):
if self._cost_sampler is None:
# Note: Using the deepcopy here is essential. Removing this poses a
# problem if the base model and the cost model have a different number
# of outputs or test points (this would be caused by expand), as this
# would trigger re-sampling the base samples in the fantasy sampler.
# By cloning the sampler here, the right thing will happen if the
# the sizes are compatible, if they are not this will result in
# samples being drawn using different base samples, but it will at
# least avoid changing state of the fantasy sampler.
self._cost_sampler = deepcopy(self.sampler)
return self._cost_sampler
[docs] @t_batch_mode_transform()
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate qMultiFidelityKnowledgeGradient on the candidate set `X`.
Args:
X: A `b x (q + num_fantasies) x d` Tensor with `b` t-batches of
`q + num_fantasies` design points each. We split this X tensor
into two parts in the `q` dimension (`dim=-2`). The first `q`
are the q-batch of design points and the last num_fantasies are
the current solutions of the inner optimization problem.
`X_fantasies = X[..., -num_fantasies:, :]`
`X_fantasies.shape = b x num_fantasies x d`
`X_actual = X[..., :-num_fantasies, :]`
`X_actual.shape = b x q x d`
In addition, `X` may be augmented with fidelity parameteres as
part of thee `d`-dimension. Projecting fidelities to the target
fidelity is handled by `project`.
Returns:
A Tensor of shape `b`. For t-batch b, the q-KG value of the design
`X_actual[b]` is averaged across the fantasy models, where
`X_fantasies[b, i]` is chosen as the final selection for the
`i`-th fantasy model.
NOTE: If `current_value` is not provided, then this is not the
true KG value of `X_actual[b]`, and `X_fantasies[b, : ]` must be
maximized at fixed `X_actual[b]`.
"""
X_actual, X_fantasies = _split_fantasy_points(X=X, n_f=self.num_fantasies)
# We only concatenate X_pending into the X part after splitting
if self.X_pending is not None:
X_eval = torch.cat(
[X_actual, match_batch_shape(self.X_pending, X_actual)], dim=-2
)
else:
X_eval = X_actual
# construct the fantasy model of shape `num_fantasies x b`
# expand X (to potentially add trace observations)
fantasy_model = self.model.fantasize(
X=self.expand(X_eval), sampler=self.sampler, observation_noise=True
)
# get the value function
value_function = _get_value_function(
model=fantasy_model, objective=self.objective, sampler=self.inner_sampler
)
# make sure to propagate gradients to the fantasy model train inputs
# project the fantasy points
with settings.propagate_grads(True):
values = value_function(X=self.project(X_fantasies)) # num_fantasies x b
if self.current_value is not None:
values = values - self.current_value
if self.cost_aware_utility is not None:
values = self.cost_aware_utility(
X=X_actual, deltas=values, sampler=self.cost_sampler
)
# return average over the fantasy samples
return values.mean(dim=0)
def _get_value_function(
model: Model,
objective: Optional[Union[MCAcquisitionObjective, ScalarizedObjective]] = None,
sampler: Optional[MCSampler] = None,
) -> AcquisitionFunction:
r"""Construct value function (i.e. inner acquisition function)."""
if isinstance(objective, MCAcquisitionObjective):
return qSimpleRegret(model=model, sampler=sampler, objective=objective)
else:
return PosteriorMean(model=model, objective=objective)
def _split_fantasy_points(X: Tensor, n_f: int) -> Tuple[Tensor, Tensor]:
r"""Split a one-shot optimization input into actual and fantasy points
Args:
X: A `batch_shape x (q + n_f) x d`-dim tensor of actual and fantasy
points
Returns:
2-element tuple containing
- A `batch_shape x q x d`-dim tensor `X_actual` of input candidates.
- A `n_f x batch_shape x 1 x d`-dim tensor `X_fantasies` of fantasy
points, where `X_fantasies[i, batch_idx]` is the i-th fantasy point
associated with the batch indexed by `batch_idx`.
"""
if n_f > X.size(-2):
raise ValueError(
f"n_f ({n_f}) must be less than the q-batch dimension of X ({X.size(-2)})"
)
split_sizes = [X.size(-2) - n_f, n_f]
X_actual, X_fantasies = torch.split(X, split_sizes, dim=-2)
# X_fantasies is b x num_fantasies x d, needs to be num_fantasies x b x 1 x d
# for batch mode evaluation with batch shape num_fantasies x b.
# b x num_fantasies x d --> num_fantasies x b x d
X_fantasies = X_fantasies.permute(-2, *range(X_fantasies.dim() - 2), -1)
# num_fantasies x b x 1 x d
X_fantasies = X_fantasies.unsqueeze(dim=-2)
return X_actual, X_fantasies