#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Batch Knowledge Gradient (KG) via one-shot optimization as introduced in
[Balandat2019botorch]_. For broader discussion of KG see also
[Frazier2008knowledge]_, [Wu2016parallelkg]_.
.. [Balandat2019botorch]
M. Balandat, B. Karrer, D. R. Jiang, S. Daulton, B. Letham, A. G. Wilson,
and E. Bakshy. BoTorch: Programmable Bayesian Optimziation in PyTorch.
ArXiv 2019.
.. [Frazier2008knowledge]
P. Frazier, W. Powell, and S. Dayanik. A Knowledge-Gradient policy for
sequential information collection. SIAM Journal on Control and Optimization,
2008.
.. [Wu2016parallelkg]
J. Wu and P. Frazier. The parallel knowledge gradient method for batch
bayesian optimization. NIPS 2016.
"""
from __future__ import annotations
from copy import deepcopy
from typing import Callable, Optional, Tuple, Union
import torch
from torch import Tensor
from .. import settings
from ..exceptions.errors import UnsupportedError
from ..models.model import Model
from ..sampling.samplers import MCSampler, SobolQMCNormalSampler
from ..utils.transforms import match_batch_shape, t_batch_mode_transform
from .acquisition import AcquisitionFunction, OneShotAcquisitionFunction
from .analytic import PosteriorMean
from .cost_aware import CostAwareUtility
from .monte_carlo import MCAcquisitionFunction, qSimpleRegret
from .objective import AcquisitionObjective, MCAcquisitionObjective, ScalarizedObjective
[docs]class qKnowledgeGradient(MCAcquisitionFunction, OneShotAcquisitionFunction):
r"""Batch Knowledge Gradient using one-shot optimization.
This computes the batch Knowledge Gradient using fantasies for the outer
expectation and either the model posterior mean or MC-sampling for the inner
expectation.
In addition to the design variables, the input `X` also includes variables
for the optimal designs for each of the fantasy models. For a fixed number
of fantasies, all parts of `X` can be optimized in a "one-shot" fashion.
"""
def __init__(
self,
model: Model,
num_fantasies: Optional[int] = 64,
sampler: Optional[MCSampler] = None,
objective: Optional[AcquisitionObjective] = None,
inner_sampler: Optional[MCSampler] = None,
X_pending: Optional[Tensor] = None,
current_value: Optional[Tensor] = None,
) -> None:
r"""q-Knowledge Gradient (one-shot optimization).
Args:
model: A fitted model. Must support fantasizing.
num_fantasies: The number of fantasy points to use. More fantasy
points result in a better approximation, at the expense of
memory and wall time. Unused if `sampler` is specified.
sampler: The sampler used to sample fantasy observations. Optional
if `num_fantasies` is specified.
objective: The objective under which the samples are evaluated. If
`None` or a ScalarizedObjective, then the analytic posterior mean
is used, otherwise the objective is MC-evaluated (using
inner_sampler).
inner_sampler: The sampler used for inner sampling. Ignored if the
objective is `None` or a ScalarizedObjective.
X_pending: A `m x d`-dim Tensor of `m` design points that have
points that have been submitted for function evaluation
but have not yet been evaluated.
current_value: The current value, i.e. the expected best objective
given the observed points `D`. If omitted, forward will not
return the actual KG value, but the expected best objective
given the data set `D u X`.
"""
if sampler is None:
if num_fantasies is None:
raise ValueError(
"Must specify `num_fantasies` if no `sampler` is provided."
)
# base samples should be fixed for joint optimization over X, X_fantasies
sampler = SobolQMCNormalSampler(
num_samples=num_fantasies, resample=False, collapse_batch_dims=True
)
elif num_fantasies is not None:
if sampler.sample_shape != torch.Size([num_fantasies]):
raise ValueError(
f"The sampler shape must match num_fantasies={num_fantasies}."
)
else:
num_fantasies = sampler.sample_shape[0]
super(MCAcquisitionFunction, self).__init__(model=model)
# if not explicitly specified, we use the posterior mean for linear objs
if isinstance(objective, MCAcquisitionObjective) and inner_sampler is None:
inner_sampler = SobolQMCNormalSampler(
num_samples=128, resample=False, collapse_batch_dims=True
)
if objective is None and model.num_outputs != 1:
raise UnsupportedError(
"Must specify an objective when using a multi-output model."
)
self.sampler = sampler
self.objective = objective
self.set_X_pending(X_pending)
self.inner_sampler = inner_sampler
self.num_fantasies = num_fantasies
self.current_value = current_value
[docs] @t_batch_mode_transform()
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate qKnowledgeGradient on the candidate set `X`.
Args:
X: A `b x (q + num_fantasies) x d` Tensor with `b` t-batches of
`q + num_fantasies` design points each. We split this X tensor
into two parts in the `q` dimension (`dim=-2`). The first `q`
are the q-batch of design points and the last num_fantasies are
the current solutions of the inner optimization problem.
`X_fantasies = X[..., -num_fantasies:, :]`
`X_fantasies.shape = b x num_fantasies x d`
`X_actual = X[..., :-num_fantasies, :]`
`X_actual.shape = b x q x d`
Returns:
A Tensor of shape `b`. For t-batch b, the q-KG value of the design
`X_actual[b]` is averaged across the fantasy models, where
`X_fantasies[b, i]` is chosen as the final selection for the
`i`-th fantasy model.
NOTE: If `current_value` is not provided, then this is not the
true KG value of `X_actual[b]`, and `X_fantasies[b, : ]` must be
maximized at fixed `X_actual[b]`.
"""
X_actual, X_fantasies = _split_fantasy_points(X=X, n_f=self.num_fantasies)
# We only concatenate X_pending into the X part after splitting
if self.X_pending is not None:
X_actual = torch.cat(
[X_actual, match_batch_shape(self.X_pending, X_actual)], dim=-2
)
# construct the fantasy model of shape `num_fantasies x b`
fantasy_model = self.model.fantasize(
X=X_actual, sampler=self.sampler, observation_noise=True
)
# get the value function
value_function = _get_value_function(
model=fantasy_model, objective=self.objective, sampler=self.inner_sampler
)
# make sure to propagate gradients to the fantasy model train inputs
with settings.propagate_grads(True):
values = value_function(X=X_fantasies) # num_fantasies x b
if self.current_value is not None:
values = values - self.current_value
# return average over the fantasy samples
return values.mean(dim=0)
[docs] def get_augmented_q_batch_size(self, q: int) -> int:
r"""Get augmented q batch size for one-shot optimzation.
Args:
q: The number of candidates to consider jointly.
Returns:
The augmented size for one-shot optimzation (including variables
parameterizing the fantasy solutions).
"""
return q + self.num_fantasies
[docs] def extract_candidates(self, X_full: Tensor) -> Tensor:
r"""We only return X as the set of candidates post-optimization.
Args:
X_full: A `b x (q + num_fantasies) x d`-dim Tensor with `b`
t-batches of `q + num_fantasies` design points each.
Returns:
A `b x q x d`-dim Tensor with `b` t-batches of `q` design points each.
"""
return X_full[..., : -self.num_fantasies, :]
[docs]class qMultiFidelityKnowledgeGradient(qKnowledgeGradient):
r"""Batch Knowledge Gradient for multi-fidelity optimization.
A version of `qKnowledgeGradient` that supports multi-fidelity optimization
via a `CostAwareUtility` and the `project` and `expand` operators. If none
of these are set, this acquisition function reduces to `qKnowledgeGradient`.
"""
def __init__(
self,
model: Model,
num_fantasies: Optional[int] = 64,
sampler: Optional[MCSampler] = None,
objective: Optional[AcquisitionObjective] = None,
inner_sampler: Optional[MCSampler] = None,
X_pending: Optional[Tensor] = None,
current_value: Optional[Tensor] = None,
cost_aware_utility: Optional[CostAwareUtility] = None,
project: Callable[[Tensor], Tensor] = lambda X: X,
expand: Callable[[Tensor], Tensor] = lambda X: X,
) -> None:
r"""Multi-Fidelity q-Knowledge Gradient (one-shot optimization).
Args:
model: A fitted model. Must support fantasizing.
num_fantasies: The number of fantasy points to use. More fantasy
points result in a better approximation, at the expense of
memory and wall time. Unused if `sampler` is specified.
sampler: The sampler used to sample fantasy observations. Optional
if `num_fantasies` is specified.
objective: The objective under which the samples are evaluated. If
`None` or a ScalarizedObjective, then the analytic posterior mean
is used, otherwise the objective is MC-evaluated (using
inner_sampler).
inner_sampler: The sampler used for inner sampling. Ignored if the
objective is `None` or a ScalarizedObjective.
X_pending: A `m x d`-dim Tensor of `m` design points that have
points that have been submitted for function evaluation
but have not yet been evaluated.
current_value: The current value, i.e. the expected best objective
given the observed points `D`. If omitted, forward will not
return the actual KG value, but the expected best objective
given the data set `D u X`.
cost_aware_utility: A CostAwareUtility computing the cost-transformed
utility from a candidate set and samples of increases in utility.
project: A callable mapping a `batch_shape x q x d` tensor of design
points to a tensor of the same shape projected to the desired
target set (e.g. the target fidelities in case of multi-fidelity
optimization).
expand: A callable mapping a `batch_shape x q x d` input tensor to
a `batch_shape x (q + q_e)' x d`-dim output tensor, where the
`q_e` additional points in each q-batch correspond to
additional ("trace") observations.
"""
if current_value is None and cost_aware_utility is not None:
raise UnsupportedError(
"Cost-aware KG requires current_value to be specified."
)
super().__init__(
model=model,
num_fantasies=num_fantasies,
sampler=sampler,
objective=objective,
inner_sampler=inner_sampler,
X_pending=X_pending,
current_value=current_value,
)
self.cost_aware_utility = cost_aware_utility
self.project = project
self.expand = expand
self._cost_sampler = None
@property
def cost_sampler(self):
if self._cost_sampler is None:
# Note: Using the deepcopy here is essential. Removing this poses a
# problem if the base model and the cost model have a different number
# of outputs or test points (this would be caused by expand), as this
# would trigger re-sampling the base samples in the fantasy sampler.
# By cloning the sampler here, the right thing will happen if the
# the sizes are compatible, if they are not this will result in
# samples being drawn using different base samples, but it will at
# least avoid changing state of the fantasy sampler.
self._cost_sampler = deepcopy(self.sampler)
return self._cost_sampler
[docs] @t_batch_mode_transform()
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate qMultiFidelityKnowledgeGradient on the candidate set `X`.
Args:
X: A `b x (q + num_fantasies) x d` Tensor with `b` t-batches of
`q + num_fantasies` design points each. We split this X tensor
into two parts in the `q` dimension (`dim=-2`). The first `q`
are the q-batch of design points and the last num_fantasies are
the current solutions of the inner optimization problem.
`X_fantasies = X[..., -num_fantasies:, :]`
`X_fantasies.shape = b x num_fantasies x d`
`X_actual = X[..., :-num_fantasies, :]`
`X_actual.shape = b x q x d`
In addition, `X` may be augmented with fidelity parameteres as
part of thee `d`-dimension. Projecting fidelities to the target
fidelity is handled by `project`.
Returns:
A Tensor of shape `b`. For t-batch b, the q-KG value of the design
`X_actual[b]` is averaged across the fantasy models, where
`X_fantasies[b, i]` is chosen as the final selection for the
`i`-th fantasy model.
NOTE: If `current_value` is not provided, then this is not the
true KG value of `X_actual[b]`, and `X_fantasies[b, : ]` must be
maximized at fixed `X_actual[b]`.
"""
X_actual, X_fantasies = _split_fantasy_points(X=X, n_f=self.num_fantasies)
# We only concatenate X_pending into the X part after splitting
if self.X_pending is not None:
X_eval = torch.cat(
[X_actual, match_batch_shape(self.X_pending, X_actual)], dim=-2
)
else:
X_eval = X_actual
# construct the fantasy model of shape `num_fantasies x b`
# expand X (to potentially add trace observations)
fantasy_model = self.model.fantasize(
X=self.expand(X_eval), sampler=self.sampler, observation_noise=True
)
# get the value function
value_function = _get_value_function(
model=fantasy_model, objective=self.objective, sampler=self.inner_sampler
)
# make sure to propagate gradients to the fantasy model train inputs
# project the fantasy points
with settings.propagate_grads(True):
values = value_function(X=self.project(X_fantasies)) # num_fantasies x b
if self.current_value is not None:
values = values - self.current_value
if self.cost_aware_utility is not None:
values = self.cost_aware_utility(
X=X_actual, deltas=values, sampler=self.cost_sampler
)
# return average over the fantasy samples
return values.mean(dim=0)
def _get_value_function(
model: Model,
objective: Optional[Union[MCAcquisitionObjective, ScalarizedObjective]] = None,
sampler: Optional[MCSampler] = None,
) -> AcquisitionFunction:
r"""Construct value function (i.e. inner acquisition function)."""
if isinstance(objective, MCAcquisitionObjective):
return qSimpleRegret(model=model, sampler=sampler, objective=objective)
else:
return PosteriorMean(model=model, objective=objective)
def _split_fantasy_points(X: Tensor, n_f: int) -> Tuple[Tensor, Tensor]:
r"""Split a one-shot optimization input into actual and fantasy points
Args:
X: A `batch_shape x (q + n_f) x d`-dim tensor of actual and fantasy
points
Returns:
2-element tuple containing
- A `batch_shape x q x d`-dim tensor `X_actual` of input candidates.
- A `n_f x batch_shape x 1 x d`-dim tensor `X_fantasies` of fantasy
points, where `X_fantasies[i, batch_idx]` is the i-th fantasy point
associated with the batch indexed by `batch_idx`.
"""
if n_f > X.size(-2):
raise ValueError(
f"n_f ({n_f}) must be less than the q-batch dimension of X ({X.size(-2)})"
)
split_sizes = [X.size(-2) - n_f, n_f]
X_actual, X_fantasies = torch.split(X, split_sizes, dim=-2)
# X_fantasies is b x num_fantasies x d, needs to be num_fantasies x b x 1 x d
# for batch mode evaluation with batch shape num_fantasies x b.
# b x num_fantasies x d --> num_fantasies x b x d
X_fantasies = X_fantasies.permute(-2, *range(X_fantasies.dim() - 2), -1)
# num_fantasies x b x 1 x d
X_fantasies = X_fantasies.unsqueeze(dim=-2)
return X_actual, X_fantasies