#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Utilities for acquisition functions.
"""
from __future__ import annotations
import math
import warnings
from typing import Callable, Dict, List, Optional
import torch
from torch import Tensor
from torch.quasirandom import SobolEngine
from .. import settings
from ..exceptions.errors import UnsupportedError
from ..exceptions.warnings import SamplingWarning
from ..models.model import Model
from ..sampling.samplers import IIDNormalSampler, SobolQMCNormalSampler
from ..utils.transforms import squeeze_last_dim
from . import monte_carlo # noqa F401
from . import analytic
from .acquisition import AcquisitionFunction
from .objective import IdentityMCObjective, MCAcquisitionObjective
[docs]def get_acquisition_function(
acquisition_function_name: str,
model: Model,
objective: MCAcquisitionObjective,
X_observed: Tensor,
X_pending: Optional[Tensor] = None,
mc_samples: int = 500,
qmc: bool = True,
seed: Optional[int] = None,
**kwargs,
) -> monte_carlo.MCAcquisitionFunction:
r"""Convenience function for initializing botorch acquisition functions.
Args:
acquisition_function_name: Name of the acquisition function.
model: A fitted model.
objective: A MCAcquisitionObjective.
X_observed: A `m1 x d`-dim Tensor of `m1` design points that have
already been observed.
X_pending: A `m2 x d`-dim Tensor of `m2` design points whose evaluation
is pending.
mc_samples: The number of samples to use for (q)MC evaluation of the
acquisition function.
qmc: If True, use quasi-Monte-Carlo sampling (instead of iid).
seed: If provided, perform deterministic optimization (i.e. the
function to optimize is fixed and not stochastic).
Returns:
The requested acquisition function.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> obj = LinearMCObjective(weights=torch.tensor([1.0, 2.0]))
>>> acqf = get_acquisition_function("qEI", model, obj, train_X)
"""
# initialize the sampler
if qmc:
sampler = SobolQMCNormalSampler(num_samples=mc_samples, seed=seed)
else:
sampler = IIDNormalSampler(num_samples=mc_samples, seed=seed)
# instantiate and return the requested acquisition function
if acquisition_function_name == "qEI":
best_f = objective(model.posterior(X_observed).mean).max().item()
return monte_carlo.qExpectedImprovement(
model=model,
best_f=best_f,
sampler=sampler,
objective=objective,
X_pending=X_pending,
)
elif acquisition_function_name == "qPI":
best_f = objective(model.posterior(X_observed).mean).max().item()
return monte_carlo.qProbabilityOfImprovement(
model=model,
best_f=best_f,
sampler=sampler,
objective=objective,
X_pending=X_pending,
tau=kwargs.get("tau", 1e-3),
)
elif acquisition_function_name == "qNEI":
return monte_carlo.qNoisyExpectedImprovement(
model=model,
X_baseline=X_observed,
sampler=sampler,
objective=objective,
X_pending=X_pending,
prune_baseline=kwargs.get("prune_baseline", False),
)
elif acquisition_function_name == "qSR":
return monte_carlo.qSimpleRegret(
model=model, sampler=sampler, objective=objective, X_pending=X_pending
)
elif acquisition_function_name == "qUCB":
if "beta" not in kwargs:
raise ValueError("`beta` must be specified in kwargs for qUCB.")
return monte_carlo.qUpperConfidenceBound(
model=model,
beta=kwargs["beta"],
sampler=sampler,
objective=objective,
X_pending=X_pending,
)
raise NotImplementedError(
f"Unknown acquisition function {acquisition_function_name}"
)
[docs]def get_infeasible_cost(
X: Tensor, model: Model, objective: Callable[[Tensor], Tensor] = squeeze_last_dim
) -> float:
r"""Get infeasible cost for a model and objective.
Computes an infeasible cost `M` such that `-M < min_x f(x)` almost always,
so that feasible points are preferred.
Args:
X: A `n x d` Tensor of `n` design points to use in evaluating the
minimum. These points should cover the design space well. The more
points the better the estimate, at the expense of added computation.
model: A fitted botorch model.
objective: The objective with which to evaluate the model output.
Returns:
The infeasible cost `M` value.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> objective = lambda Y: Y[..., -1] ** 2
>>> M = get_infeasible_cost(train_X, model, obj)
"""
posterior = model.posterior(X)
lb = objective(posterior.mean - 6 * posterior.variance.clamp_min(0).sqrt()).min()
M = -(lb.clamp_max(0.0))
return M.item()
[docs]def is_nonnegative(acq_function: AcquisitionFunction) -> bool:
r"""Determine whether a given acquisition function is non-negative.
Args:
acq_function: The `AcquisitionFunction` instance.
Returns:
True if `acq_function` is non-negative, False if not, or if the behavior
is unknown (for custom acquisition functions).
Example:
>>> qEI = qExpectedImprovement(model, best_f=0.1)
>>> is_nonnegative(qEI) # returns True
"""
return isinstance(
acq_function,
(
analytic.ExpectedImprovement,
analytic.ConstrainedExpectedImprovement,
analytic.ProbabilityOfImprovement,
analytic.NoisyExpectedImprovement,
monte_carlo.qExpectedImprovement,
monte_carlo.qNoisyExpectedImprovement,
monte_carlo.qProbabilityOfImprovement,
),
)
[docs]def prune_inferior_points(
model: Model,
X: Tensor,
objective: Optional[MCAcquisitionObjective] = None,
num_samples: int = 2048,
max_frac: float = 1.0,
) -> Tensor:
r"""Prune points from an input tensor that are unlikely to be the best point.
Given a model, an objective, and an input tensor `X`, this function returns
the subset of points in `X` that have some probability of being the best
point under the objective. This function uses sampling to estimate the
probabilities, the higher the number of points `n` in `X` the higher the
number of samples `num_samples` should be to obtain accurate estimates.
Args:
model: A fitted model. Batched models are currently not supported.
X: An input tensor of shape `n x d`. Batched inputs are currently not
supported.
objective: The objective under which to evaluate the posterior.
num_samples: The number of samples used to compute empirical
probabilities of being the best point.
max_frac: The maximum fraction of points to retain. Must satisfy
`0 < max_frac <= 1`. Ensures that the number of elements in the
returned tensor does not exceed `ceil(max_frac * n)`.
Returns:
A `n' x d` with subset of points in `X`, where
n' = min(N_nz, ceil(max_frac * n))
with `N_nz` the number of points in `X` that have non-zero (empirical,
under `num_samples` samples) probability of being the best point.
"""
if X.ndim > 2:
# TODO: support batched inputs (req. dealing with ragged tensors)
raise UnsupportedError(
"Batched inputs `X` are currently unsupported by prune_inferior_points"
)
max_points = math.ceil(max_frac * X.size(-2))
if max_points < 1 or max_points > X.size(-2):
raise ValueError(f"max_frac must take values in (0, 1], is {max_frac}")
with torch.no_grad():
posterior = model.posterior(X=X)
if posterior.event_shape.numel() > SobolEngine.MAXDIM:
if settings.debug.on():
warnings.warn(
f"Sample dimension q*m={posterior.event_shape.numel()} exceeding Sobol "
f"max dimension ({SobolEngine.MAXDIM}). Using iid samples instead.",
SamplingWarning,
)
sampler = IIDNormalSampler(num_samples=num_samples)
else:
sampler = SobolQMCNormalSampler(num_samples=num_samples)
samples = sampler(posterior)
if objective is None:
objective = IdentityMCObjective()
obj_vals = objective(samples)
if obj_vals.ndim > 2:
# TODO: support batched inputs (req. dealing with ragged tensors)
raise UnsupportedError(
"Batched models are currently unsupported by prune_inferior_points"
)
is_best = torch.argmax(obj_vals, dim=-1)
idcs, counts = torch.unique(is_best, return_counts=True)
if len(idcs) > max_points:
counts, order_idcs = torch.sort(counts, descending=True)
idcs = order_idcs[:max_points]
return X[idcs]
[docs]def project_to_target_fidelity(
X: Tensor, target_fidelities: Optional[Dict[int, float]] = None
) -> Tensor:
r"""Project `X` onto the target set of fidelities.
This function assumes that the set of feasible fidelities is a box, so
projecting here just means setting each fidelity parameter to its target
value.
Args:
X: A `batch_shape x q x d`-dim Tensor of with `q` `d`-dim design points
for each t-batch.
target_fidelities: A dictionary mapping a subset of columns of `X` (the
fidelity parameters) to their respective target fidelity value. If
omitted, assumes that the last column of X is the fidelity parameter
with a target value of 1.0.
Return:
A `batch_shape x q x d`-dim Tensor `X_proj` with fidelity parameters
projected to the provided fidelity values.
"""
if target_fidelities is None:
target_fidelities = {-1: 1.0}
d = X.size(-1)
# normalize to positive indices
tfs = {k if k >= 0 else d + k: v for k, v in target_fidelities.items()}
ones = torch.ones(*X.shape[:-1], device=X.device, dtype=X.dtype)
# here we're looping through the feature dimension of X - this could be
# slow for large `d`, we should optimize this for that case
X_proj = torch.stack(
[X[..., i] if i not in tfs else tfs[i] * ones for i in range(d)], dim=-1
)
return X_proj
[docs]def expand_trace_observations(
X: Tensor, fidelity_dims: Optional[List[int]] = None, num_trace_obs: int = 0
) -> Tensor:
r"""Expand `X` with trace observations.
Expand a tensor of inputs with "trace observations" that are obtained during
the evaluation of the candidate set. This is used in multi-fidelity
optimization. It can be though of as augmenting the `q`-batch with additional
points that are the expected trace observations.
Let `f_i` be the `i`-th fidelity parameter. Then this functions assumes that
for each element of the q-batch, besides the fidelity `f_i`, we will observe
additonal fidelities `f_i1, ..., f_iK`, where `K = num_trace_obs`, during
evaluation of the candidate set `X`. Specifically, this function assumes
that `f_ij = (K-j) / (num_trace_obs + 1) * f_i` for all `i`. That is, the
expansion is performed in parallel for all fidelities (it does not expand
out all possible combinations).
Args:
X: A `batch_shape x q x d`-dim Tensor of with `q` `d`-dim design points
(incl. the fidelity parameters) for each t-batch.
fidelity_dims: The indices of the fidelity parameters. If omitted,
assumes that the last column of X contains the fidelity parameters.
num_trace_obs: The number of trace observations to use.
Return:
A `batch_shape x (q + num_trace_obs x q) x d` Tensor `X_expanded` that
expands `X` with trace observations.
"""
if num_trace_obs == 0: # No need to expand if we don't use trace observations
return X
if fidelity_dims is None:
fidelity_dims = [-1]
# The general strategy in the following is to expand `X` to the desired
# shape, and then multiply it (point-wise) with a tensor of scaling factors
reps = [1] * (X.ndim - 2) + [1 + num_trace_obs, 1]
X_expanded = X.repeat(*reps) # batch_shape x (q + num_trace_obs x q) x d
scale_fac = torch.ones_like(X_expanded)
s_pad = 1 / (num_trace_obs + 1)
# tensor of num_trace_obs scaling factors equally space between 1-s_pad and s_pad
sf = torch.linspace(1 - s_pad, s_pad, num_trace_obs, device=X.device, dtype=X.dtype)
# repeat each element q times
q = X.size(-2)
sf = torch.repeat_interleave(sf, q) # num_trace_obs * q
# now expand this to num_trace_obs x q x num_fidelities
sf = sf.unsqueeze(-1).expand(X_expanded.size(-2) - q, len(fidelity_dims))
# change relevant entries of the scaling tensor
scale_fac[..., q:, fidelity_dims] = sf
return scale_fac * X_expanded