In this tutorial, we illustrate how to implement a simple Bayesian Optimization (BO) closed loop in BoTorch.
In general, we recommend for a relatively simple setup (like this one) to use Ax, since this will simplify your setup (including the amount of code you need to write) considerably. See the Using BoTorch with Ax tutorial.
However, you may want to do things that are not easily supported in Ax at this time (like running high-dimensional BO using a VAE+GP model that you jointly train on high-dimensional input data). If you find yourself in such a situation, you will need to write your own optimization loop, as we do in this tutorial.
We use the batch Expected Improvement (qEI) and batch Noisy Expected Improvement (qNEI) acquisition functions to optimize a constrained version of the synthetic Hartmann6 test function. The standard problem is
$$f(x) = -\sum_{i=1}^4 \alpha_i \exp \left( -\sum_{j=1}^6 A_{ij} (x_j - P_{ij})^2 \right)$$
over $x \in [0,1]^6$ (parameter values can be found in botorch/test_functions/hartmann6.py
).
In real BO applications, the design $x$ can influence multiple metrics in unknown ways, and the decision-maker often wants to optimize one metric without sacrificing another. To illustrate this, we add a synthetic constraint of the form $\|x\|_1 - 3 \le 0$. Both the objective and the constraint are observed with noise.
Since botorch assumes a maximization problem, we will attempt to maximize $-f(x)$ to achieve $\max_{x} -f(x) = 3.32237$.
import os
import torch
device = torch.device("cuda:3" if torch.cuda.is_available() else "cpu")
dtype = torch.double
SMOKE_TEST = os.environ.get("SMOKE_TEST")
I1116 175307.080 _utils_internal.py:179] NCCL_DEBUG env var is set to None I1116 175307.081 _utils_internal.py:188] NCCL_DEBUG is INFO from /etc/nccl.conf
First, we define the constraint used in the example in outcome_constraint
. The second function weighted_obj
is a "feasibility-weighted objective," which returns zero when not feasible.
from botorch.test_functions import Hartmann
neg_hartmann6 = Hartmann(negate=True)
def outcome_constraint(X):
"""L1 constraint; feasible if less than or equal to zero."""
return X.sum(dim=-1) - 3
def weighted_obj(X):
"""Feasibility weighted objective; zero if not feasible."""
return neg_hartmann6(X) * (outcome_constraint(X) <= 0).type_as(X)
We use a MultiOutputGP
to model the objective (output 0) and the constraint (output 1). We assume known homoskedastic observation noise on both the objective and constraint with standard error $\sigma = 0.5$.
Each component is a FixedNoiseGP
. The models are initialized with 10 points drawn randomly from $[0,1]^6$.
from botorch.models import FixedNoiseGP, ModelListGP
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood
NOISE_SE = 0.5
train_yvar = torch.tensor(NOISE_SE**2, device=device, dtype=dtype)
def generate_initial_data(n=10):
# generate training data
train_x = torch.rand(10, 6, device=device, dtype=dtype)
exact_obj = neg_hartmann6(train_x).unsqueeze(-1) # add output dimension
exact_con = outcome_constraint(train_x).unsqueeze(-1) # add output dimension
train_obj = exact_obj + NOISE_SE * torch.randn_like(exact_obj)
train_con = exact_con + NOISE_SE * torch.randn_like(exact_con)
best_observed_value = weighted_obj(train_x).max().item()
return train_x, train_obj, train_con, best_observed_value
def initialize_model(train_x, train_obj, train_con, state_dict=None):
# define models for objective and constraint
model_obj = FixedNoiseGP(train_x, train_obj, train_yvar.expand_as(train_obj)).to(
train_x
)
model_con = FixedNoiseGP(train_x, train_con, train_yvar.expand_as(train_con)).to(
train_x
)
# combine into a multi-output GP model
model = ModelListGP(model_obj, model_con)
mll = SumMarginalLogLikelihood(model.likelihood, model)
# load state dict if it is passed
if state_dict is not None:
model.load_state_dict(state_dict)
return mll, model
The methods below take the outputs of the GP and return the objective and the constraint. In general, these can be any Callable
, but here we simply need to index the correct output.
from botorch.acquisition.objective import ConstrainedMCObjective
def obj_callable(Z):
return Z[..., 0]
def constraint_callable(Z):
return Z[..., 1]
# define a feasibility-weighted objective for optimization
constrained_obj = ConstrainedMCObjective(
objective=obj_callable,
constraints=[constraint_callable],
)
The helper function below takes an acquisition function as an argument, optimizes it, and returns the batch $\{x_1, x_2, \ldots x_q\}$ along with the observed function values. For this example, we'll use a small batch of $q=3$. The function optimize_acqf
optimizes the $q$ points jointly. A simple initialization heuristic is used to select the 10 restart initial locations from a set of 50 random points.
from botorch.optim import optimize_acqf
bounds = torch.tensor([[0.0] * 6, [1.0] * 6], device=device, dtype=dtype)
BATCH_SIZE = 3 if not SMOKE_TEST else 2
NUM_RESTARTS = 10 if not SMOKE_TEST else 2
RAW_SAMPLES = 512 if not SMOKE_TEST else 32
def optimize_acqf_and_get_observation(acq_func):
"""Optimizes the acquisition function, and returns a new candidate and a noisy observation."""
# optimize
candidates, _ = optimize_acqf(
acq_function=acq_func,
bounds=bounds,
q=BATCH_SIZE,
num_restarts=NUM_RESTARTS,
raw_samples=RAW_SAMPLES, # used for intialization heuristic
options={"batch_limit": 5, "maxiter": 200},
)
# observe new values
new_x = candidates.detach()
exact_obj = neg_hartmann6(new_x).unsqueeze(-1) # add output dimension
exact_con = outcome_constraint(new_x).unsqueeze(-1) # add output dimension
new_obj = exact_obj + NOISE_SE * torch.randn_like(exact_obj)
new_con = exact_con + NOISE_SE * torch.randn_like(exact_con)
return new_x, new_obj, new_con
def update_random_observations(best_random):
"""Simulates a random policy by taking a the current list of best values observed randomly,
drawing a new random point, observing its value, and updating the list.
"""
rand_x = torch.rand(BATCH_SIZE, 6)
next_random_best = weighted_obj(rand_x).max().item()
best_random.append(max(best_random[-1], next_random_best))
return best_random
The Bayesian optimization "loop" for a batch size of $q$ simply iterates the following steps:
Just for illustration purposes, we run three trials each of which do N_BATCH=20
rounds of optimization. The acquisition function is approximated using MC_SAMPLES=256
samples.
Note: Running this may take a little while.
from botorch import fit_gpytorch_mll
from botorch.acquisition.monte_carlo import (
qExpectedImprovement,
qNoisyExpectedImprovement,
)
from botorch.sampling.normal import SobolQMCNormalSampler
from botorch.exceptions import BadInitialCandidatesWarning
import time
import warnings
warnings.filterwarnings("ignore", category=BadInitialCandidatesWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
N_TRIALS = 3 if not SMOKE_TEST else 2
N_BATCH = 20 if not SMOKE_TEST else 2
MC_SAMPLES = 256 if not SMOKE_TEST else 32
verbose = False
best_observed_all_ei, best_observed_all_nei, best_random_all = [], [], []
# average over multiple trials
for trial in range(1, N_TRIALS + 1):
print(f"\nTrial {trial:>2} of {N_TRIALS} ", end="")
best_observed_ei, best_observed_nei, best_random = [], [], []
# call helper functions to generate initial training data and initialize model
(
train_x_ei,
train_obj_ei,
train_con_ei,
best_observed_value_ei,
) = generate_initial_data(n=10)
mll_ei, model_ei = initialize_model(train_x_ei, train_obj_ei, train_con_ei)
train_x_nei, train_obj_nei, train_con_nei = train_x_ei, train_obj_ei, train_con_ei
best_observed_value_nei = best_observed_value_ei
mll_nei, model_nei = initialize_model(train_x_nei, train_obj_nei, train_con_nei)
best_observed_ei.append(best_observed_value_ei)
best_observed_nei.append(best_observed_value_nei)
best_random.append(best_observed_value_ei)
# run N_BATCH rounds of BayesOpt after the initial random batch
for iteration in range(1, N_BATCH + 1):
t0 = time.monotonic()
# fit the models
fit_gpytorch_mll(mll_ei)
fit_gpytorch_mll(mll_nei)
# define the qEI and qNEI acquisition modules using a QMC sampler
qmc_sampler = SobolQMCNormalSampler(sample_shape=torch.Size([MC_SAMPLES]))
# for best_f, we use the best observed noisy values as an approximation
qEI = qExpectedImprovement(
model=model_ei,
best_f=(train_obj_ei * (train_con_ei <= 0).to(train_obj_ei)).max(),
sampler=qmc_sampler,
objective=constrained_obj,
)
qNEI = qNoisyExpectedImprovement(
model=model_nei,
X_baseline=train_x_nei,
sampler=qmc_sampler,
objective=constrained_obj,
)
# optimize and get new observation
new_x_ei, new_obj_ei, new_con_ei = optimize_acqf_and_get_observation(qEI)
new_x_nei, new_obj_nei, new_con_nei = optimize_acqf_and_get_observation(qNEI)
# update training points
train_x_ei = torch.cat([train_x_ei, new_x_ei])
train_obj_ei = torch.cat([train_obj_ei, new_obj_ei])
train_con_ei = torch.cat([train_con_ei, new_con_ei])
train_x_nei = torch.cat([train_x_nei, new_x_nei])
train_obj_nei = torch.cat([train_obj_nei, new_obj_nei])
train_con_nei = torch.cat([train_con_nei, new_con_nei])
# update progress
best_random = update_random_observations(best_random)
best_value_ei = weighted_obj(train_x_ei).max().item()
best_value_nei = weighted_obj(train_x_nei).max().item()
best_observed_ei.append(best_value_ei)
best_observed_nei.append(best_value_nei)
# reinitialize the models so they are ready for fitting on next iteration
# use the current state dict to speed up fitting
mll_ei, model_ei = initialize_model(
train_x_ei,
train_obj_ei,
train_con_ei,
model_ei.state_dict(),
)
mll_nei, model_nei = initialize_model(
train_x_nei,
train_obj_nei,
train_con_nei,
model_nei.state_dict(),
)
t1 = time.monotonic()
if verbose:
print(
f"\nBatch {iteration:>2}: best_value (random, qEI, qNEI) = "
f"({max(best_random):>4.2f}, {best_value_ei:>4.2f}, {best_value_nei:>4.2f}), "
f"time = {t1-t0:>4.2f}.",
end="",
)
else:
print(".", end="")
best_observed_all_ei.append(best_observed_ei)
best_observed_all_nei.append(best_observed_nei)
best_random_all.append(best_random)
Trial 1 of 3 .................... Trial 2 of 3 .................... Trial 3 of 3 ....................
The plot below shows the best objective value observed at each step of the optimization for each of the algorithms. The confidence intervals represent the variance at that step in the optimization across the trial runs. The variance across optimization runs is quite high, so in order to get a better estimate of the average performance one would have to run a much larger number of trials N_TRIALS
(we avoid this here to limit the runtime of this tutorial).
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline
def ci(y):
return 1.96 * y.std(axis=0) / np.sqrt(N_TRIALS)
GLOBAL_MAXIMUM = neg_hartmann6.optimal_value
iters = np.arange(N_BATCH + 1) * BATCH_SIZE
y_ei = np.asarray(best_observed_all_ei)
y_nei = np.asarray(best_observed_all_nei)
y_rnd = np.asarray(best_random_all)
fig, ax = plt.subplots(1, 1, figsize=(8, 6))
ax.errorbar(iters, y_rnd.mean(axis=0), yerr=ci(y_rnd), label="random", linewidth=1.5)
ax.errorbar(iters, y_ei.mean(axis=0), yerr=ci(y_ei), label="qEI", linewidth=1.5)
ax.errorbar(iters, y_nei.mean(axis=0), yerr=ci(y_nei), label="qNEI", linewidth=1.5)
plt.plot(
[0, N_BATCH * BATCH_SIZE],
[GLOBAL_MAXIMUM] * 2,
"k",
label="true best objective",
linewidth=2,
)
ax.set_ylim(bottom=0.5)
ax.set(
xlabel="number of observations (beyond initial points)",
ylabel="best objective value",
)
ax.legend(loc="lower right")
<matplotlib.legend.Legend at 0x7fa8e9094310>