q-Noisy Constrained EI
Closed-loop batch, constrained BO in BoTorch with qLogEI and qLogNEI
In this tutorial, we illustrate how to implement a simple Bayesian Optimization (BO) closed loop in BoTorch.
In general, we recommend for a relatively simple setup (like this one) to use Ax, since this will simplify your setup (including the amount of code you need to write) considerably. See the Using BoTorch with Ax tutorial.
However, you may want to do things that are not easily supported in Ax at this time (like running high-dimensional BO using a VAE+GP model that you jointly train on high-dimensional input data). If you find yourself in such a situation, you will need to write your own optimization loop, as we do in this tutorial.
We use the batch Log Expected Improvement (qLogEI
) and batch Noisy Expected
Improvement (qLogNEI
) acquisition functions to optimize a constrained version of the
synthetic Hartmann6 test function. The standard problem is
over (parameter values can be found in
botorch/test_functions/hartmann6.py
).
In real BO applications, the design can influence multiple metrics in unknown ways, and the decision-maker often wants to optimize one metric without sacrificing another. To illustrate this, we add a synthetic constraint of the form . Both the objective and the constraint are observed with noise.
Since botorch assumes a maximization problem, we will attempt to maximize to achieve .
# Install dependencies if we are running in colab
import sys
if 'google.colab' in sys.modules:
%pip install botorch
import os
from typing import Optional
import torch
device = torch.device("cuda:3" if torch.cuda.is_available() else "cpu")
dtype = torch.double
SMOKE_TEST = os.environ.get("SMOKE_TEST")
Problem setup
First, we define the constraint used in the example in outcome_constraint
. The second
function weighted_obj
is a "feasibility-weighted objective," which returns zero when
not feasible.
from botorch.test_functions import Hartmann
neg_hartmann6 = Hartmann(negate=True)
def outcome_constraint(X):
"""L1 constraint; feasible if less than or equal to zero."""
return X.sum(dim=-1) - 3
def weighted_obj(X):
"""Feasibility weighted objective; zero if not feasible."""
return neg_hartmann6(X) * (outcome_constraint(X) <= 0).type_as(X)
Model initialization
We use a MultiOutputGP
to model the objective (output 0) and the constraint (output
1). We assume known homoskedastic observation noise on both the objective and constraint
with standard error .
Each component is a SingleTaskGP
. The models are initialized with 10 points drawn
randomly from .
from botorch.models.transforms.input import Normalize
from botorch.models import SingleTaskGP, ModelListGP
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood
NOISE_SE = 0.25
train_yvar = torch.tensor(NOISE_SE**2, device=device, dtype=dtype)
def generate_initial_data(n=10):
# generate training data
train_x = torch.rand(10, 6, device=device, dtype=dtype)
exact_obj = neg_hartmann6(train_x).unsqueeze(-1) # add output dimension
exact_con = outcome_constraint(train_x).unsqueeze(-1) # add output dimension
train_obj = exact_obj + NOISE_SE * torch.randn_like(exact_obj)
train_con = exact_con + NOISE_SE * torch.randn_like(exact_con)
best_observed_value = weighted_obj(train_x).max().item()
return train_x, train_obj, train_con, best_observed_value
def initialize_model(train_x, train_obj, train_con, state_dict=None):
# define models for objective and constraint
model_obj = SingleTaskGP(
train_x,
train_obj,
train_yvar.expand_as(train_obj),
input_transform=Normalize(d=train_x.shape[-1]),
).to(train_x)
model_con = SingleTaskGP(
train_x,
train_con,
train_yvar.expand_as(train_con),
input_transform=Normalize(d=train_x.shape[-1]),
).to(train_x)
# combine into a multi-output GP model
model = ModelListGP(model_obj, model_con)
mll = SumMarginalLogLikelihood(model.likelihood, model)
# load state dict if it is passed
if state_dict is not None:
model.load_state_dict(state_dict)
return mll, model
Define a construct to extract the objective and constraint from the GP
The methods below take the outputs of the GP and return the objective and the
constraint. In general, these can be any Callable
, but here we simply need to index
the correct output.
from botorch.acquisition.objective import GenericMCObjective
def obj_callable(Z: torch.Tensor, X: Optional[torch.Tensor] = None):
return Z[..., 0]
def constraint_callable(Z):
return Z[..., 1]
objective = GenericMCObjective(objective=obj_callable)
Define a helper function that performs the essential BO step
The helper function below takes an acquisition function as an argument, optimizes it,
and returns the batch along with the observed function values.
For this example, we'll use a small batch of . The function optimize_acqf
optimizes the points jointly. A simple initialization heuristic is used to select
the 10 restart initial locations from a set of 50 random points.
from botorch.optim import optimize_acqf
bounds = torch.tensor([[0.0] * 6, [1.0] * 6], device=device, dtype=dtype)
BATCH_SIZE = 3 if not SMOKE_TEST else 2
NUM_RESTARTS = 10 if not SMOKE_TEST else 2
RAW_SAMPLES = 512 if not SMOKE_TEST else 32
def optimize_acqf_and_get_observation(acq_func):
"""Optimizes the acquisition function, and returns a new candidate and a noisy observation."""
# optimize
candidates, _ = optimize_acqf(
acq_function=acq_func,
bounds=bounds,
q=BATCH_SIZE,
num_restarts=NUM_RESTARTS,
raw_samples=RAW_SAMPLES, # used for intialization heuristic
options={"batch_limit": 5, "maxiter": 200},
)
# observe new values
new_x = candidates.detach()
exact_obj = neg_hartmann6(new_x).unsqueeze(-1) # add output dimension
exact_con = outcome_constraint(new_x).unsqueeze(-1) # add output dimension
new_obj = exact_obj + NOISE_SE * torch.randn_like(exact_obj)
new_con = exact_con + NOISE_SE * torch.randn_like(exact_con)
return new_x, new_obj, new_con
def update_random_observations(best_random):
"""Simulates a random policy by taking a the current list of best values observed randomly,
drawing a new random point, observing its value, and updating the list.
"""
rand_x = torch.rand(BATCH_SIZE, 6)
next_random_best = weighted_obj(rand_x).max().item()
best_random.append(max(best_random[-1], next_random_best))
return best_random
Perform Bayesian Optimization loop with qLogNEI
The Bayesian optimization "loop" for a batch size of simply iterates the following steps:
- given a surrogate model, choose a batch of points
- observe for each in the batch
- update the surrogate model.
Just for illustration purposes, we run three trials each of which do N_BATCH=20
rounds
of optimization. The acquisition function is approximated using MC_SAMPLES=256
samples.
Note: Running this may take a little while.
import time
import warnings
from botorch import fit_gpytorch_mll
from botorch.acquisition import (
qLogExpectedImprovement,
qLogNoisyExpectedImprovement,
)
from botorch.exceptions import BadInitialCandidatesWarning
from botorch.sampling.normal import SobolQMCNormalSampler
warnings.filterwarnings("ignore", category=BadInitialCandidatesWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
N_TRIALS = 3 if not SMOKE_TEST else 2
N_BATCH = 20 if not SMOKE_TEST else 2
MC_SAMPLES = 256 if not SMOKE_TEST else 32
verbose = False
best_observed_all_ei, best_observed_all_nei, best_random_all = [], [], []
# average over multiple trials
for trial in range(1, N_TRIALS + 1):
print(f"\nTrial {trial:>2} of {N_TRIALS} ", end="")
best_observed_ei, best_observed_nei, best_random = [], [], []
# call helper functions to generate initial training data and initialize model
(
train_x_ei,
train_obj_ei,
train_con_ei,
best_observed_value_ei,
) = generate_initial_data(n=10)
mll_ei, model_ei = initialize_model(train_x_ei, train_obj_ei, train_con_ei)
train_x_nei, train_obj_nei, train_con_nei = train_x_ei, train_obj_ei, train_con_ei
best_observed_value_nei = best_observed_value_ei
mll_nei, model_nei = initialize_model(train_x_nei, train_obj_nei, train_con_nei)
best_observed_ei.append(best_observed_value_ei)
best_observed_nei.append(best_observed_value_nei)
best_random.append(best_observed_value_ei)
# run N_BATCH rounds of BayesOpt after the initial random batch
for iteration in range(1, N_BATCH + 1):
t0 = time.monotonic()
# fit the models
fit_gpytorch_mll(mll_ei)
fit_gpytorch_mll(mll_nei)
# define the qEI and qNEI acquisition modules using a QMC sampler
qmc_sampler = SobolQMCNormalSampler(sample_shape=torch.Size([MC_SAMPLES]))
# for best_f, we use the best observed noisy values as an approximation
qLogEI = qLogExpectedImprovement(
model=model_ei,
best_f=(train_obj_ei * (train_con_ei <= 0).to(train_obj_ei)).max(),
sampler=qmc_sampler,
objective=objective,
constraints=[constraint_callable],
)
qLogNEI = qLogNoisyExpectedImprovement(
model=model_nei,
X_baseline=train_x_nei,
sampler=qmc_sampler,
objective=objective,
constraints=[constraint_callable],
)
# optimize and get new observation
new_x_ei, new_obj_ei, new_con_ei = optimize_acqf_and_get_observation(qLogEI)
new_x_nei, new_obj_nei, new_con_nei = optimize_acqf_and_get_observation(qLogNEI)
# update training points
train_x_ei = torch.cat([train_x_ei, new_x_ei])
train_obj_ei = torch.cat([train_obj_ei, new_obj_ei])
train_con_ei = torch.cat([train_con_ei, new_con_ei])
train_x_nei = torch.cat([train_x_nei, new_x_nei])
train_obj_nei = torch.cat([train_obj_nei, new_obj_nei])
train_con_nei = torch.cat([train_con_nei, new_con_nei])
# update progress
best_random = update_random_observations(best_random)
best_value_ei = weighted_obj(train_x_ei).max().item()
best_value_nei = weighted_obj(train_x_nei).max().item()
best_observed_ei.append(best_value_ei)
best_observed_nei.append(best_value_nei)
# reinitialize the models so they are ready for fitting on next iteration
# use the current state dict to speed up fitting
mll_ei, model_ei = initialize_model(
train_x_ei,
train_obj_ei,
train_con_ei,
model_ei.state_dict(),
)
mll_nei, model_nei = initialize_model(
train_x_nei,
train_obj_nei,
train_con_nei,
model_nei.state_dict(),
)
t1 = time.monotonic()
if verbose:
print(
f"\nBatch {iteration:>2}: best_value (random, qEI, qNEI) = "
f"({max(best_random):>4.2f}, {best_value_ei:>4.2f}, {best_value_nei:>4.2f}), "
f"time = {t1-t0:>4.2f}.",
end="",
)
else:
print(".", end="")
best_observed_all_ei.append(best_observed_ei)
best_observed_all_nei.append(best_observed_nei)
best_random_all.append(best_random)
Plot the results
The plot below shows the best objective value observed at each step of the optimization
for each of the algorithms. The confidence intervals represent the variance at that step
in the optimization across the trial runs. The variance across optimization runs is
quite high, so in order to get a better estimate of the average performance one would
have to run a much larger number of trials N_TRIALS
(we avoid this here to limit the
runtime of this tutorial).
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline
def ci(y):
return 1.96 * y.std(axis=0) / np.sqrt(N_TRIALS)
GLOBAL_MAXIMUM = neg_hartmann6.optimal_value
iters = np.arange(N_BATCH + 1) * BATCH_SIZE
y_ei = np.asarray(best_observed_all_ei)
y_nei = np.asarray(best_observed_all_nei)
y_rnd = np.asarray(best_random_all)
fig, ax = plt.subplots(1, 1, figsize=(8, 6))
ax.errorbar(iters, y_rnd.mean(axis=0), yerr=ci(y_rnd), label="random", linewidth=1.5)
ax.errorbar(iters, y_ei.mean(axis=0), yerr=ci(y_ei), label="qLogEI", linewidth=1.5)
ax.errorbar(iters, y_nei.mean(axis=0), yerr=ci(y_nei), label="qLogNEI", linewidth=1.5)
plt.plot(
[0, N_BATCH * BATCH_SIZE],
[GLOBAL_MAXIMUM] * 2,
"k",
label="true best feasible objective",
linewidth=2,
)
ax.set_ylim(bottom=0.5)
ax.set(
xlabel="number of observations (beyond initial points)",
ylabel="best objective value",
)
ax.legend(loc="lower right")