Risk averse Bayesian optimization with environmental variables
This notebook considers risk averse Bayesian optimization of objectives , where denotes the design variable and denotes the environmental variable. The design variable is fully controlled by the practitioner, however, the environmental variable is only controllable at the experimentation phase and is determined by the environment once the decision is implemented, according to some probability distribution. In this setting, with the denoting the random environmental variable, the objective we want to optimize becomes a random function, written as , whose value is determined only once the environmental variable is realized. This formulation is relevant whenever we need to make a decision to be implemented in an unknown future environment, and we can simulate the environment during the optimization phase.
For this problem setting, [1] proposes to optimize a risk measure of the random function, written as , where denotes a risk measure, which is a functional that maps a random variable (in this case induced by ) to a real number. They propose the KG acquisition function, which extends the well-known knowledge-gradient acquisition function, and requires access to posterior mean of the objective, i.e., , where the expectation is taken over the sample paths of the GP model. Unlike the posterior mean of the function , the posterior mean of the risk measure is not available in closed-form and needs to be estimated via sampling. The procedure for estimating for a given is as follows:
- Draw a set of
n_w
samples of according to the probability distribution. Let's call thisw_set
. - Append each in
w_set
to the given to get pairs. Note that for a single , we now haven_w
pairs of . - Draw samples from the joint posterior distribution of these
n_w
pairs of . Note that the joint distribution here is ann_w
-dimensional Gaussian distribution. - Calculate the empirical risk measure corresponding to each sample, converting each
n_w
-dimensional posterior sample to a scalar sample of the risk measure. - Take the average of these risk measure samples to get the Monte-Carlo estimate of the posterior mean of the risk measure.
Now that the background is established, we are ready to implement a one-shot version of the KG acquisition function proposed in [1], in native BoTorch. We will:
- Use
AppendFeatures
input transform to add the set of samples to each given ; - Calculate the joint posterior over these samples;
- Use
RiskMeasureMCObjective
to convert these joint samples into samples of the risk measure; - And use the samples of the risk measure in
qMultiFidelityKnowledgeGradient
to define the KG acquisition function.
We will use the (negated) Branin function as with the first input dimension denoting and the second input dimension denoting , and find the maximizing the CVaR risk measure at risk level . We will assume that has a uniform distribution over and approximate the risk measure using (qMC) samples of at a given time.
CVaR, the Conditional Value-at-Risk, is a risk measure that measures the expectation of the worst outcomes (small rewards or large losses) with a total probability of . It is commonly defined as the conditional expectation of the reward function, with the condition that the reward is smaller than the corresponding quantile.
Note: Risk measures are typically studied in the context of a minimization problem
(including in [1]), since it makes more sense to minimize "risk", and treat the larger
values as being undesirable. Since the default behavior in BoTorch is to maximize the
objective, the RiskMeasureMCObjective
(and its subclasses) is defined w.r.t. the lower
tail of the random variable, i.e., by treating the smaller values as undesirable. With
this implementation, all that is needed to minimize a risk measure (of the original
objective) is to negate the objective, as is done in this notebook.
# Install dependencies if we are running in colab
import sys
if 'google.colab' in sys.modules:
%pip install botorch
import os
import warnings
from time import time
import matplotlib.pyplot as plt
import torch
from botorch import fit_gpytorch_mll
from botorch.acquisition import qMultiFidelityKnowledgeGradient, qSimpleRegret
from botorch.acquisition.risk_measures import CVaR
from botorch.models import SingleTaskGP
from botorch.models.transforms import Standardize
from botorch.models.transforms.input import AppendFeatures
from botorch.optim import optimize_acqf
from botorch.utils.sampling import draw_sobol_samples
from botorch.utils.transforms import unnormalize
from botorch.test_functions import Branin
from gpytorch import ExactMarginalLogLikelihood
from torch import Tensor
%matplotlib inline
warnings.filterwarnings("ignore")
SMOKE_TEST = os.environ.get("SMOKE_TEST")
BATCH_SIZE = 2 if not SMOKE_TEST else 1
NUM_RESTARTS = 10 if not SMOKE_TEST else 2
RAW_SAMPLES = 128 if not SMOKE_TEST else 4
N_W = 16 if not SMOKE_TEST else 2
NUM_ITERATIONS = 20 if not SMOKE_TEST else 2
NUM_FANTASIES = 16 if not SMOKE_TEST else 2
tkwargs = {"device": "cpu", "dtype": torch.double}
Problem setup
We will initialize the Branin
test function and define a wrapper around it to
normalize the domain to .
test_function = Branin(negate=True)
dim = test_function.dim
def evaluate_function(X: Tensor) -> Tensor:
return test_function(unnormalize(X, test_function.bounds)).view(*X.shape[:-1], 1)
Model initialization
We will initialize the SingleTaskGP
model on Sobol points drawn from the
space. In doing so, we will also pass in the AppendFeatures
. We will re-initialize
AppendFeatures
with a new w_set
at every model training to ensure adequate coverage
of the space.
bounds = torch.stack([torch.zeros(dim), torch.ones(dim)]).to(**tkwargs)
train_X = draw_sobol_samples(bounds=bounds, n=8, q=1).squeeze(-2).to(**tkwargs)
train_Y = evaluate_function(train_X)
def train_model(train_X: Tensor, train_Y: Tensor) -> SingleTaskGP:
r"""Returns a `SingleTaskGP` model trained on the inputs"""
w_set = (
draw_sobol_samples(n=N_W, q=1, bounds=bounds[:, -1:]).squeeze(-2).to(**tkwargs)
)
model = SingleTaskGP(
train_X,
train_Y,
input_transform=AppendFeatures(feature_set=w_set),
outcome_transform=Standardize(m=1),
)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_mll(mll)
return model
model = train_model(train_X, train_Y)
Define a helper function that performs the BO step
The helper function will initialize the qMultiFidelityKnowledgeGradient
acquisition
function with the risk measure objective, and optimize it to find the candidate to
evaluate. We use qMultiFidelityKnowledgeGradient
instead of qKnowledgeGraient
since
it accepts a project
callable, which we will use to ignore the present in the
fantasy solutions before adding the w_set
via the AppendFeatures
input transform.
risk_measure = CVaR(alpha=0.7, n_w=N_W)
def ignore_w(X: Tensor) -> Tensor:
r"""Remove `w` from the input."""
return X[..., :-1]
def optimize_rho_kg_and_get_observation():
r"""Optimizes the rhoKG acquisition function, and returns a new candidate and observation."""
acqf = qMultiFidelityKnowledgeGradient(
model=model,
num_fantasies=NUM_FANTASIES,
objective=risk_measure,
project=ignore_w,
)
candidate, _ = optimize_acqf(
acq_function=acqf,
bounds=bounds,
q=BATCH_SIZE,
num_restarts=NUM_RESTARTS,
raw_samples=RAW_SAMPLES,
)
new_observations = evaluate_function(candidate)
return candidate, new_observations
Perform the Bayesian optimization loop with KG
The BO loop iterates the following steps:
- Given the surrogate model, maximize the acquisition function to find the candidate(s) to evaluate;
- Observe for each candidate;
- Update the surrogate model with the new observation.
Note: Running this may take a while.
start_time = time()
for i in range(NUM_ITERATIONS):
print(f"Starting iteration {i}, total time: {time() - start_time:.3f} seconds.")
# optimize the acquisition function and get the observations
candidate, observations = optimize_rho_kg_and_get_observation()
# update the model with new observations
train_X = torch.cat([train_X, candidate], dim=0)
train_Y = torch.cat([train_Y, observations], dim=0)
model = train_model(train_X, train_Y)
Find the solution to implement
We will choose the solution to implement as the point maximizing the posterior
expectation of the risk measure. Since this expectation is not available in closed form,
we will maximize its qMC estimate as a surrogate. We will use a larger w_set
here to
get a more precise estimate.
# update the input transform of the already trained model
w_set = draw_sobol_samples(n=128, q=1, bounds=bounds[:, -1:]).squeeze(-2).to(**tkwargs)
new_transform = AppendFeatures(feature_set=w_set).eval()
model.input_transform = new_transform
risk_measure = CVaR(alpha=0.7, n_w=128)
expected_risk_measure = qSimpleRegret(model=model, objective=risk_measure)
final_candidate, expected_objective = optimize_acqf(
acq_function=expected_risk_measure,
bounds=bounds[:, :1],
q=1,
num_restarts=NUM_RESTARTS,
raw_samples=RAW_SAMPLES,
)
Let's plot the true risk measure and see how we did
We can use the input transform and the risk measure we previously defined to make this part easier!
The plot shows that we found the global optimal solution and that our estimate of the risk measure at the optimal point is quite accurate.
plot_x = torch.linspace(0, 1, 100, **tkwargs).view(-1, 1)
eval_X = new_transform(plot_x)
eval_Y = evaluate_function(eval_X)
plot_risk_measure = risk_measure(eval_Y)
plt.figure(figsize=(12, 8))
plt.title("True Risk Measure Objective and Solution Found")
plt.plot(plot_x, plot_risk_measure)
plt.scatter(final_candidate, expected_objective, marker="*", color="red", s=500)
plt.xlabel("x")
plt.ylabel("$\\rho[f(x, w)]$")
plt.show()