Source code for botorch.acquisition.input_constructors

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

r"""
A registry of helpers for generating inputs to acquisition function
constructors programmatically from a consistent input format.
"""

from __future__ import annotations

import warnings
from typing import (
    Any,
    Callable,
    Dict,
    Hashable,
    Iterable,
    List,
    Optional,
    Sequence,
    Tuple,
    Type,
    TypeVar,
    Union,
)

import torch
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.analytic import (
    ConstrainedExpectedImprovement,
    ExpectedImprovement,
    NoisyExpectedImprovement,
    PosteriorMean,
    ProbabilityOfImprovement,
    UpperConfidenceBound,
)
from botorch.acquisition.cost_aware import InverseCostWeightedUtility
from botorch.acquisition.fixed_feature import FixedFeatureAcquisitionFunction
from botorch.acquisition.knowledge_gradient import (
    qKnowledgeGradient,
    qMultiFidelityKnowledgeGradient,
)
from botorch.acquisition.max_value_entropy_search import (
    qMaxValueEntropy,
    qMultiFidelityMaxValueEntropy,
)
from botorch.acquisition.monte_carlo import (
    qExpectedImprovement,
    qNoisyExpectedImprovement,
    qProbabilityOfImprovement,
    qSimpleRegret,
    qUpperConfidenceBound,
)
from botorch.acquisition.multi_objective import (
    ExpectedHypervolumeImprovement,
    MCMultiOutputObjective,
    qExpectedHypervolumeImprovement,
    qNoisyExpectedHypervolumeImprovement,
)
from botorch.acquisition.multi_objective.objective import (
    AnalyticMultiOutputObjective,
    IdentityAnalyticMultiOutputObjective,
    IdentityMCMultiOutputObjective,
)
from botorch.acquisition.multi_objective.utils import get_default_partitioning_alpha
from botorch.acquisition.objective import (
    AcquisitionObjective,
    IdentityMCObjective,
    MCAcquisitionObjective,
    PosteriorTransform,
    ScalarizedObjective,
    ScalarizedPosteriorTransform,
)
from botorch.acquisition.preference import AnalyticExpectedUtilityOfBestOption
from botorch.acquisition.risk_measures import RiskMeasureMCObjective
from botorch.acquisition.utils import (
    expand_trace_observations,
    project_to_target_fidelity,
)
from botorch.exceptions.errors import UnsupportedError
from botorch.models.cost import AffineFidelityCostModel
from botorch.models.deterministic import FixedSingleSampleModel
from botorch.models.model import Model
from botorch.optim.optimize import optimize_acqf
from botorch.sampling.samplers import IIDNormalSampler, MCSampler, SobolQMCNormalSampler
from botorch.utils.constraints import get_outcome_constraint_transforms
from botorch.utils.containers import BotorchContainer
from botorch.utils.datasets import BotorchDataset, SupervisedDataset
from botorch.utils.multi_objective.box_decompositions.non_dominated import (
    FastNondominatedPartitioning,
    NondominatedPartitioning,
)
from torch import Tensor


ACQF_INPUT_CONSTRUCTOR_REGISTRY = {}

T = TypeVar("T")
MaybeDict = Union[T, Dict[Hashable, T]]


def _field_is_shared(
    datasets: Union[Iterable[BotorchDataset], Dict[Hashable, BotorchDataset]],
    fieldname: Hashable,
) -> bool:
    r"""Determines whether or not a given field is shared by all datasets."""
    if isinstance(datasets, dict):
        datasets = datasets.values()

    base = None
    for dataset in datasets:
        if not hasattr(dataset, fieldname):
            raise AttributeError(f"{type(dataset)} object has no field `{fieldname}`.")

        obj = getattr(dataset, fieldname)
        if base is None:
            base = obj
        elif base != obj:
            return False

    return True


def _get_dataset_field(
    dataset: MaybeDict[BotorchDataset],
    fieldname: str,
    transform: Optional[Callable[[BotorchContainer], Any]] = None,
    join_rule: Optional[Callable[[Sequence[Any]], Any]] = None,
    first_only: bool = False,
    assert_shared: bool = False,
) -> Any:
    r"""Convenience method for extracting a given field from one or more datasets."""
    if isinstance(dataset, dict):
        if assert_shared and not _field_is_shared(dataset, fieldname):
            raise ValueError(f"Field `{fieldname}` must be shared.")

        if not first_only:
            fields = (
                _get_dataset_field(d, fieldname, transform) for d in dataset.values()
            )
            return join_rule(tuple(fields)) if join_rule else tuple(fields)

        dataset = next(iter(dataset.values()))

    field = getattr(dataset, fieldname)
    return transform(field) if transform else field


[docs]def get_acqf_input_constructor( acqf_cls: Type[AcquisitionFunction], ) -> Callable[..., Dict[str, Any]]: r"""Get acqusition function input constructor from registry. Args: acqf_cls: The AcquisitionFunction class (not instance) for which to retrieve the input constructor. Returns: The input constructor associated with `acqf_cls`. """ if acqf_cls not in ACQF_INPUT_CONSTRUCTOR_REGISTRY: raise RuntimeError( f"Input constructor for acquisition class `{acqf_cls.__name__}` not " "registered. Use the `@acqf_input_constructor` decorator to register " "a new method." ) return ACQF_INPUT_CONSTRUCTOR_REGISTRY[acqf_cls]
[docs]def acqf_input_constructor( *acqf_cls: Type[AcquisitionFunction], ) -> Callable[..., AcquisitionFunction]: r"""Decorator for registering acquisition function input constructors. Args: acqf_cls: The AcquisitionFunction classes (not instances) for which to register the input constructor. """ for acqf_cls_ in acqf_cls: if acqf_cls_ in ACQF_INPUT_CONSTRUCTOR_REGISTRY: raise ValueError( "Cannot register duplicate arg constructor for acquisition " f"class `{acqf_cls_.__name__}`" ) def decorator(method): for acqf_cls_ in acqf_cls: _register_acqf_input_constructor( acqf_cls=acqf_cls_, input_constructor=method ) ACQF_INPUT_CONSTRUCTOR_REGISTRY[acqf_cls_] = method return method return decorator
def _register_acqf_input_constructor( acqf_cls: Type[AcquisitionFunction], input_constructor: Callable[..., Dict[str, Any]], ) -> None: ACQF_INPUT_CONSTRUCTOR_REGISTRY[acqf_cls] = input_constructor # ------------------------- Deprecation Helpers ------------------------- # def _deprecate_objective_arg( posterior_transform: Optional[PosteriorTransform] = None, objective: Optional[AcquisitionObjective] = None, ) -> Optional[PosteriorTransform]: if posterior_transform is not None: if objective is None: return posterior_transform else: raise RuntimeError( "Got both a non-MC objective (DEPRECATED) and a posterior " "transform. Use only a posterior transform instead." ) elif objective is not None: warnings.warn( "The `objective` argument to AnalyticAcquisitionFunctions is deprecated " "and will be removed in the next version. Use `posterior_transform` " "instead.", DeprecationWarning, ) if not isinstance(objective, ScalarizedObjective): raise UnsupportedError( "Analytic acquisition functions only support ScalarizedObjective " "(DEPRECATED) type objectives." ) return ScalarizedPosteriorTransform( weights=objective.weights, offset=objective.offset ) else: return None # --------------------- Input argument constructors --------------------- #
[docs]@acqf_input_constructor(PosteriorMean) def construct_inputs_analytic_base( model: Model, training_data: MaybeDict[SupervisedDataset], posterior_transform: Optional[PosteriorTransform] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for basic analytic acquisition functions. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. posterior_transform: The posterior transform to be used in the acquisition function. Returns: A dict mapping kwarg names of the constructor to values. """ return { "model": model, "posterior_transform": _deprecate_objective_arg( posterior_transform=posterior_transform, objective=kwargs.get("objective"), ), }
[docs]@acqf_input_constructor(ExpectedImprovement, ProbabilityOfImprovement) def construct_inputs_best_f( model: Model, training_data: MaybeDict[SupervisedDataset], posterior_transform: Optional[PosteriorTransform] = None, best_f: Optional[Union[float, Tensor]] = None, maximize: bool = True, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the acquisition functions requiring `best_f`. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. Used to determine default value for `best_f`. best_f: Threshold above (or below) which improvement is defined. posterior_transform: The posterior transform to be used in the acquisition function. maximize: If True, consider the problem a maximization problem. Returns: A dict mapping kwarg names of the constructor to values. """ base_inputs = construct_inputs_analytic_base( model=model, training_data=training_data, posterior_transform=posterior_transform, **kwargs, ) if best_f is None: best_f = get_best_f_analytic( training_data=training_data, objective=kwargs.get("objective"), posterior_transform=posterior_transform, ) return {**base_inputs, "best_f": best_f, "maximize": maximize}
[docs]@acqf_input_constructor(UpperConfidenceBound) def construct_inputs_ucb( model: Model, training_data: MaybeDict[SupervisedDataset], posterior_transform: Optional[PosteriorTransform] = None, beta: Union[float, Tensor] = 0.2, maximize: bool = True, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `UpperConfidenceBound`. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. posterior_transform: The posterior transform to be used in the acquisition function. beta: Either a scalar or a one-dim tensor with `b` elements (batch mode) representing the trade-off parameter between mean and covariance maximize: If True, consider the problem a maximization problem. Returns: A dict mapping kwarg names of the constructor to values. """ base_inputs = construct_inputs_analytic_base( model=model, training_data=training_data, posterior_transform=posterior_transform, **kwargs, ) return {**base_inputs, "beta": beta, "maximize": maximize}
[docs]@acqf_input_constructor(ConstrainedExpectedImprovement) def construct_inputs_constrained_ei( model: Model, training_data: MaybeDict[SupervisedDataset], objective_index: int, constraints: Dict[int, Tuple[Optional[float], Optional[float]]], maximize: bool = True, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `ConstrainedExpectedImprovement`. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. objective_index: The index of the objective. constraints: A dictionary of the form `{i: [lower, upper]}`, where `i` is the output index, and `lower` and `upper` are lower and upper bounds on that output (resp. interpreted as -Inf / Inf if None) maximize: If True, consider the problem a maximization problem. Returns: A dict mapping kwarg names of the constructor to values. """ # TODO: Implement best point computation from training data # best_f = # return { # "model": model, # "best_f": best_f, # "objective_index": objective_index, # "constraints": constraints, # "maximize": maximize, # } raise NotImplementedError # pragma: nocover
[docs]@acqf_input_constructor(NoisyExpectedImprovement) def construct_inputs_noisy_ei( model: Model, training_data: MaybeDict[SupervisedDataset], num_fantasies: int = 20, maximize: bool = True, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `NoisyExpectedImprovement`. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. num_fantasies: The number of fantasies to generate. The higher this number the more accurate the model (at the expense of model complexity and performance). maximize: If True, consider the problem a maximization problem. Returns: A dict mapping kwarg names of the constructor to values. """ # TODO: Add prune_baseline functionality as for qNEI X = _get_dataset_field(training_data, "X", first_only=True, assert_shared=True) return { "model": model, "X_observed": X(), "num_fantasies": num_fantasies, "maximize": maximize, }
[docs]@acqf_input_constructor(qSimpleRegret) def construct_inputs_mc_base( model: Model, training_data: MaybeDict[SupervisedDataset], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, X_pending: Optional[Tensor] = None, sampler: Optional[MCSampler] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for basic MC acquisition functions. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. objective: The objective to be used in the acquisition function. posterior_transform: The posterior transform to be used in the acquisition function. X_pending: A `batch_shape, m x d`-dim Tensor of `m` design points that have points that have been submitted for function evaluation but have not yet been evaluated. sampler: The sampler used to draw base samples. If omitted, uses the acquisition functions's default sampler. Returns: A dict mapping kwarg names of the constructor to values. """ return { "model": model, "objective": objective, "posterior_transform": posterior_transform, "X_pending": X_pending, "sampler": sampler, }
[docs]@acqf_input_constructor(qExpectedImprovement) def construct_inputs_qEI( model: Model, training_data: MaybeDict[SupervisedDataset], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, X_pending: Optional[Tensor] = None, sampler: Optional[MCSampler] = None, best_f: Optional[Union[float, Tensor]] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `qExpectedImprovement` constructor. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. objective: The objective to be used in the acquisition function. posterior_transform: The posterior transform to be used in the acquisition function. X_pending: A `m x d`-dim Tensor of `m` design points that have been submitted for function evaluation but have not yet been evaluated. Concatenated into X upon forward call. sampler: The sampler used to draw base samples. If omitted, uses the acquisition functions's default sampler. best_f: Threshold above (or below) which improvement is defined. Returns: A dict mapping kwarg names of the constructor to values. """ base_inputs = construct_inputs_mc_base( model=model, training_data=training_data, objective=objective, posterior_transform=posterior_transform, sampler=sampler, X_pending=X_pending, ) if best_f is None: best_f = get_best_f_mc( training_data=training_data, objective=objective, posterior_transform=posterior_transform, ) return {**base_inputs, "best_f": best_f}
[docs]@acqf_input_constructor(qNoisyExpectedImprovement) def construct_inputs_qNEI( model: Model, training_data: MaybeDict[SupervisedDataset], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, X_pending: Optional[Tensor] = None, sampler: Optional[MCSampler] = None, X_baseline: Optional[Tensor] = None, prune_baseline: Optional[bool] = False, cache_root: Optional[bool] = True, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `qNoisyExpectedImprovement` constructor. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. objective: The objective to be used in the acquisition function. posterior_transform: The posterior transform to be used in the acquisition function. X_pending: A `m x d`-dim Tensor of `m` design points that have been submitted for function evaluation but have not yet been evaluated. Concatenated into X upon forward call. sampler: The sampler used to draw base samples. If omitted, uses the acquisition functions's default sampler. X_baseline: A `batch_shape x r x d`-dim Tensor of `r` design points that have already been observed. These points are considered as the potential best design point. If omitted, checks that all training_data have the same input features and take the first `X`. prune_baseline: If True, remove points in `X_baseline` that are highly unlikely to be the best point. This can significantly improve performance and is generally recommended. Returns: A dict mapping kwarg names of the constructor to values. """ base_inputs = construct_inputs_mc_base( model=model, training_data=training_data, objective=objective, posterior_transform=posterior_transform, sampler=sampler, X_pending=X_pending, ) if X_baseline is None: X_baseline = _get_dataset_field( training_data, fieldname="X", transform=lambda field: field(), assert_shared=True, first_only=True, ) return { **base_inputs, "X_baseline": X_baseline, "prune_baseline": prune_baseline, "cache_root": cache_root, }
[docs]@acqf_input_constructor(qProbabilityOfImprovement) def construct_inputs_qPI( model: Model, training_data: MaybeDict[SupervisedDataset], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, X_pending: Optional[Tensor] = None, sampler: Optional[MCSampler] = None, tau: float = 1e-3, best_f: Optional[Union[float, Tensor]] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `qProbabilityOfImprovement` constructor. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. objective: The objective to be used in the acquisition function. posterior_transform: The posterior transform to be used in the acquisition function. X_pending: A `m x d`-dim Tensor of `m` design points that have been submitted for function evaluation but have not yet been evaluated. Concatenated into X upon forward call. sampler: The sampler used to draw base samples. If omitted, uses the acquisition functions's default sampler. tau: The temperature parameter used in the sigmoid approximation of the step function. Smaller values yield more accurate approximations of the function, but result in gradients estimates with higher variance. best_f: The best objective value observed so far (assumed noiseless). Can be a `batch_shape`-shaped tensor, which in case of a batched model specifies potentially different values for each element of the batch. Returns: A dict mapping kwarg names of the constructor to values. """ if best_f is None: best_f = get_best_f_mc(training_data=training_data, objective=objective) base_inputs = construct_inputs_mc_base( model=model, training_data=training_data, objective=objective, posterior_transform=posterior_transform, sampler=sampler, X_pending=X_pending, ) return {**base_inputs, "tau": tau, "best_f": best_f}
[docs]@acqf_input_constructor(qUpperConfidenceBound) def construct_inputs_qUCB( model: Model, training_data: MaybeDict[SupervisedDataset], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, X_pending: Optional[Tensor] = None, sampler: Optional[MCSampler] = None, beta: float = 0.2, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `qUpperConfidenceBound` constructor. Args: model: The model to be used in the acquisition function. training_data: Dataset(s) used to train the model. objective: The objective to be used in the acquisition function. posterior_transform: The posterior transform to be used in the acquisition function. X_pending: A `m x d`-dim Tensor of `m` design points that have been submitted for function evaluation but have not yet been evaluated. Concatenated into X upon forward call. sampler: The sampler used to draw base samples. If omitted, uses the acquisition functions's default sampler. beta: Controls tradeoff between mean and standard deviation in UCB. Returns: A dict mapping kwarg names of the constructor to values. """ base_inputs = construct_inputs_mc_base( model=model, training_data=training_data, objective=objective, posterior_transform=posterior_transform, sampler=sampler, X_pending=X_pending, ) return {**base_inputs, "beta": beta}
def _get_sampler(mc_samples: int, qmc: bool) -> MCSampler: """Set up MC sampler for q(N)EHVI.""" # initialize the sampler seed = int(torch.randint(1, 10000, (1,)).item()) if qmc: return SobolQMCNormalSampler(num_samples=mc_samples, seed=seed) return IIDNormalSampler(num_samples=mc_samples, seed=seed)
[docs]@acqf_input_constructor(ExpectedHypervolumeImprovement) def construct_inputs_EHVI( model: Model, training_data: MaybeDict[SupervisedDataset], objective_thresholds: Tensor, objective: Optional[AnalyticMultiOutputObjective] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `ExpectedHypervolumeImprovement` constructor.""" num_objectives = objective_thresholds.shape[0] if kwargs.get("outcome_constraints") is not None: raise NotImplementedError("EHVI does not yet support outcome constraints.") X = _get_dataset_field( training_data, fieldname="X", transform=lambda field: field(), first_only=True, assert_shared=True, ) alpha = kwargs.get( "alpha", get_default_partitioning_alpha(num_objectives=num_objectives), ) # This selects the objectives (a subset of the outcomes) and set each # objective threhsold to have the proper optimization direction. if objective is None: objective = IdentityAnalyticMultiOutputObjective() if isinstance(objective, RiskMeasureMCObjective): pre_obj = objective.preprocessing_function else: pre_obj = objective ref_point = pre_obj(objective_thresholds) # Compute posterior mean (for ref point computation ref pareto frontier) # if one is not provided among arguments. Y_pmean = kwargs.get("Y_pmean") if Y_pmean is None: with torch.no_grad(): Y_pmean = model.posterior(X).mean if alpha > 0: partitioning = NondominatedPartitioning( ref_point=ref_point, Y=pre_obj(Y_pmean), alpha=alpha, ) else: partitioning = FastNondominatedPartitioning( ref_point=ref_point, Y=pre_obj(Y_pmean), ) return { "model": model, "ref_point": ref_point, "partitioning": partitioning, "objective": objective, }
[docs]@acqf_input_constructor(qExpectedHypervolumeImprovement) def construct_inputs_qEHVI( model: Model, training_data: MaybeDict[SupervisedDataset], objective_thresholds: Tensor, objective: Optional[MCMultiOutputObjective] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `qExpectedHypervolumeImprovement` constructor.""" X = _get_dataset_field( training_data, fieldname="X", transform=lambda field: field(), first_only=True, assert_shared=True, ) # compute posterior mean (for ref point computation ref pareto frontier) with torch.no_grad(): Y_pmean = model.posterior(X).mean outcome_constraints = kwargs.pop("outcome_constraints", None) # For HV-based acquisition functions we pass the constraint transform directly if outcome_constraints is None: cons_tfs = None else: cons_tfs = get_outcome_constraint_transforms(outcome_constraints) # Adjust `Y_pmean` to contrain feasible points only. feas = torch.stack([c(Y_pmean) <= 0 for c in cons_tfs], dim=-1).all(dim=-1) Y_pmean = Y_pmean[feas] if objective is None: objective = IdentityMCMultiOutputObjective() ehvi_kwargs = construct_inputs_EHVI( model=model, training_data=training_data, objective_thresholds=objective_thresholds, objective=objective, # Pass `Y_pmean` that accounts for constraints to `construct_inputs_EHVI` # to ensure that correct non-dominated partitioning is produced. Y_pmean=Y_pmean, **kwargs, ) sampler = kwargs.get("sampler") if sampler is None: sampler = _get_sampler( mc_samples=kwargs.get("mc_samples", 128), qmc=kwargs.get("qmc", True) ) add_qehvi_kwargs = { "sampler": sampler, "X_pending": kwargs.get("X_pending"), "constraints": cons_tfs, "eta": kwargs.get("eta", 1e-3), } return {**ehvi_kwargs, **add_qehvi_kwargs}
[docs]@acqf_input_constructor(qNoisyExpectedHypervolumeImprovement) def construct_inputs_qNEHVI( model: Model, training_data: MaybeDict[SupervisedDataset], objective_thresholds: Tensor, objective: Optional[MCMultiOutputObjective] = None, X_baseline: Optional[Tensor] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `qNoisyExpectedHypervolumeImprovement` constructor.""" if X_baseline is None: X_baseline = _get_dataset_field( training_data, fieldname="X", transform=lambda field: field(), first_only=True, assert_shared=True, ) # This selects the objectives (a subset of the outcomes) and set each # objective threhsold to have the proper optimization direction. if objective is None: objective = IdentityMCMultiOutputObjective() outcome_constraints = kwargs.pop("outcome_constraints", None) if outcome_constraints is None: cons_tfs = None else: if isinstance(objective, RiskMeasureMCObjective): raise UnsupportedError( "Outcome constraints are not supported with risk measures. " "Use a feasibility-weighted risk measure instead." ) cons_tfs = get_outcome_constraint_transforms(outcome_constraints) sampler = kwargs.get("sampler") if sampler is None: sampler = _get_sampler( mc_samples=kwargs.get("mc_samples", 128), qmc=kwargs.get("qmc", True) ) if isinstance(objective, RiskMeasureMCObjective): ref_point = objective.preprocessing_function(objective_thresholds) else: ref_point = objective(objective_thresholds) return { "model": model, "ref_point": ref_point, "X_baseline": X_baseline, "sampler": sampler, "objective": objective, "constraints": cons_tfs, "X_pending": kwargs.get("X_pending"), "eta": kwargs.get("eta", 1e-3), "prune_baseline": kwargs.get("prune_baseline", True), "alpha": kwargs.get("alpha", get_default_partitioning_alpha(model.num_outputs)), "cache_pending": kwargs.get("cache_pending", True), "max_iep": kwargs.get("max_iep", 0), "incremental_nehvi": kwargs.get("incremental_nehvi", True), }
[docs]@acqf_input_constructor(qMaxValueEntropy) def construct_inputs_qMES( model: Model, training_data: MaybeDict[SupervisedDataset], bounds: List[Tuple[float, float]], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, candidate_size: int = 1000, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `qMaxValueEntropy` constructor.""" inputs_mc = construct_inputs_mc_base( model=model, training_data=training_data, objective=objective, **kwargs, ) X = _get_dataset_field(training_data, "X", first_only=True) _kw = {"device": X.device, "dtype": X.dtype} _rvs = torch.rand(candidate_size, len(bounds), **_kw) _bounds = torch.tensor(bounds, **_kw).transpose(0, 1) return { **inputs_mc, "candidate_set": _bounds[0] + (_bounds[1] - _bounds[0]) * _rvs, "maximize": kwargs.get("maximize", True), }
[docs]def construct_inputs_mf_base( model: Model, training_data: MaybeDict[SupervisedDataset], target_fidelities: Dict[int, Union[int, float]], fidelity_weights: Optional[Dict[int, float]] = None, cost_intercept: float = 1.0, num_trace_observations: int = 0, **ignore: Any, ) -> Dict[str, Any]: r"""Construct kwargs for a multifidelity acquisition function's constructor.""" if fidelity_weights is None: fidelity_weights = {f: 1.0 for f in target_fidelities} if set(target_fidelities) != set(fidelity_weights): raise RuntimeError( "Must provide the same indices for target_fidelities " f"({set(target_fidelities)}) and fidelity_weights " f" ({set(fidelity_weights)})." ) cost_aware_utility = InverseCostWeightedUtility( cost_model=AffineFidelityCostModel( fidelity_weights=fidelity_weights, fixed_cost=cost_intercept ) ) return { "target_fidelities": target_fidelities, "cost_aware_utility": cost_aware_utility, "expand": lambda X: expand_trace_observations( X=X, fidelity_dims=sorted(target_fidelities), num_trace_obs=num_trace_observations, ), "project": lambda X: project_to_target_fidelity( X=X, target_fidelities=target_fidelities ), }
[docs]@acqf_input_constructor(qKnowledgeGradient) def construct_inputs_qKG( model: Model, training_data: MaybeDict[SupervisedDataset], bounds: List[Tuple[float, float]], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, target_fidelities: Optional[Dict[int, float]] = None, num_fantasies: int = 64, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `qKnowledgeGradient` constructor.""" inputs_mc = construct_inputs_mc_base( model=model, training_data=training_data, objective=objective, posterior_transform=posterior_transform, **kwargs, ) X = _get_dataset_field(training_data, "X", first_only=True) _bounds = torch.tensor(bounds, dtype=X.dtype, device=X.device) _, current_value = optimize_objective( model=model, bounds=_bounds.t(), q=1, target_fidelities=target_fidelities, objective=objective, posterior_transform=posterior_transform, **kwargs, ) return { **inputs_mc, "num_fantasies": num_fantasies, "current_value": current_value.detach().cpu().max(), }
[docs]@acqf_input_constructor(qMultiFidelityKnowledgeGradient) def construct_inputs_qMFKG( model: Model, training_data: MaybeDict[SupervisedDataset], bounds: List[Tuple[float, float]], target_fidelities: Dict[int, Union[int, float]], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `qMultiFidelityKnowledgeGradient` constructor.""" inputs_mf = construct_inputs_mf_base( model=model, training_data=training_data, target_fidelities=target_fidelities, **kwargs, ) inputs_kg = construct_inputs_qKG( model=model, training_data=training_data, bounds=bounds, objective=objective, posterior_transform=posterior_transform, **kwargs, ) return {**inputs_mf, **inputs_kg}
[docs]@acqf_input_constructor(qMultiFidelityMaxValueEntropy) def construct_inputs_qMFMES( model: Model, training_data: MaybeDict[SupervisedDataset], bounds: List[Tuple[float, float]], target_fidelities: Dict[int, Union[int, float]], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for `qMultiFidelityMaxValueEntropy` constructor.""" inputs_mf = construct_inputs_mf_base( model=model, training_data=training_data, target_fidelities=target_fidelities, **kwargs, ) inputs_qmes = construct_inputs_qMES( model=model, training_data=training_data, bounds=bounds, objective=objective, posterior_transform=posterior_transform, **kwargs, ) X = _get_dataset_field(training_data, "X", first_only=True) _bounds = torch.tensor(bounds, dtype=X.dtype, device=X.device) _, current_value = optimize_objective( model=model, bounds=_bounds.t(), q=1, objective=objective, posterior_transform=posterior_transform, target_fidelities=target_fidelities, **kwargs, ) return { **inputs_mf, **inputs_qmes, "current_value": current_value.detach().cpu().max(), }
[docs]@acqf_input_constructor(AnalyticExpectedUtilityOfBestOption) def construct_inputs_analytic_eubo( model: Model, pref_model: Model, previous_winner: Optional[Tensor] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `AnalyticExpectedUtilityOfBestOption` constructor. Args: model: The outcome model to be used in the acquisition function. pref_model: The preference model to be used in preference exploration Returns: A dict mapping kwarg names of the constructor to values. """ # construct a deterministic fixed single sample model from `model` # i.e., performing EUBO-zeta by default as described # in https://arxiv.org/abs/2203.11382 one_sample_outcome_model = FixedSingleSampleModel(model=model) return { "pref_model": pref_model, "outcome_model": one_sample_outcome_model, "previous_winner": previous_winner, }
[docs]def get_best_f_analytic( training_data: MaybeDict[SupervisedDataset], posterior_transform: Optional[PosteriorTransform] = None, **kwargs, ) -> Tensor: if isinstance(training_data, dict) and not _field_is_shared( training_data, fieldname="X" ): raise NotImplementedError("Currently only block designs are supported.") Y = _get_dataset_field( training_data, fieldname="Y", transform=lambda field: field(), join_rule=lambda field_tensors: torch.cat(field_tensors, dim=-1), ) posterior_transform = _deprecate_objective_arg( posterior_transform=posterior_transform, objective=kwargs.get("objective", None) ) if posterior_transform is not None: return posterior_transform.evaluate(Y).max(-1).values if Y.shape[-1] > 1: raise NotImplementedError( "Analytic acquisition functions currently only work with " "multi-output models if provided with a `ScalarizedObjective`." ) return Y.max(-2).values.squeeze(-1)
[docs]def get_best_f_mc( training_data: MaybeDict[SupervisedDataset], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, ) -> Tensor: if isinstance(training_data, dict) and not _field_is_shared( training_data, fieldname="X" ): raise NotImplementedError("Currently only block designs are supported.") Y = _get_dataset_field( training_data, fieldname="Y", transform=lambda field: field(), join_rule=lambda field_tensors: torch.cat(field_tensors, dim=-1), ) posterior_transform = _deprecate_objective_arg( posterior_transform=posterior_transform, objective=objective if not isinstance(objective, MCAcquisitionObjective) else None, ) if posterior_transform is not None: # retain the original tensor dimension since objective expects explicit # output dimension. Y_dim = Y.dim() Y = posterior_transform.evaluate(Y) if Y.dim() < Y_dim: Y = Y.unsqueeze(-1) if objective is None: if Y.shape[-1] > 1: raise UnsupportedError( "Acquisition functions require an objective when " "used with multi-output models (execpt for multi-objective" "acquisition functions)." ) objective = IdentityMCObjective() return objective(Y).max(-1).values
[docs]def optimize_objective( model: Model, bounds: Tensor, q: int, objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, linear_constraints: Optional[Tuple[Tensor, Tensor]] = None, fixed_features: Optional[Dict[int, float]] = None, target_fidelities: Optional[Dict[int, float]] = None, qmc: bool = True, mc_samples: int = 512, seed_inner: Optional[int] = None, optimizer_options: Dict[str, Any] = None, post_processing_func: Optional[Callable[[Tensor], Tensor]] = None, batch_initial_conditions: Optional[Tensor] = None, sequential: bool = False, **ignore, ) -> Tuple[Tensor, Tensor]: r"""Optimize an objective under the given model. Args: model: The model to be used in the objective. bounds: A `2 x d` tensor of lower and upper bounds for each column of `X`. q: The cardinality of input sets on which the objective is to be evaluated. objective: The objective to optimize. posterior_transform: The posterior transform to be used in the acquisition function. linear_constraints: A tuple of (A, b). Given `k` linear constraints on a `d`-dimensional space, `A` is `k x d` and `b` is `k x 1` such that `A x <= b`. (Not used by single task models). fixed_features: A dictionary of feature assignments `{feature_index: value}` to hold fixed during generation. target_fidelities: A dictionary mapping input feature indices to fidelity values. Defaults to `{-1: 1.0}`. qmc: Toggle for enabling (qmc=1) or disabling (qmc=0) use of Quasi Monte Carlo. mc_samples: Integer number of samples used to estimate Monte Carlo objectives. seed_inner: Integer seed used to initialize the sampler passed to MCObjective. optimizer_options: Table used to lookup keyword arguments for the optimizer. post_processing_func: A function that post-processes an optimization result appropriately (i.e. according to `round-trip` transformations). batch_initial_conditions: A Tensor of initial values for the optimizer. sequential: If False, uses joint optimization, otherwise uses sequential optimization. Returns: A tuple containing the best input locations and corresponding objective values. """ if optimizer_options is None: optimizer_options = {} if objective is not None: sampler_cls = SobolQMCNormalSampler if qmc else IIDNormalSampler acq_function = qSimpleRegret( model=model, objective=objective, posterior_transform=posterior_transform, sampler=sampler_cls(num_samples=mc_samples, seed=seed_inner), ) else: acq_function = PosteriorMean( model=model, posterior_transform=posterior_transform ) if fixed_features: acq_function = FixedFeatureAcquisitionFunction( acq_function=acq_function, d=bounds.shape[-1], columns=list(fixed_features.keys()), values=list(fixed_features.values()), ) free_feature_dims = list(range(len(bounds)) - fixed_features.keys()) free_feature_bounds = bounds[:, free_feature_dims] # (2, d' <= d) else: free_feature_bounds = bounds if linear_constraints is None: inequality_constraints = None else: A, b = linear_constraints inequality_constraints = [] k, d = A.shape for i in range(k): indicies = A[i, :].nonzero(as_tuple=False).squeeze() coefficients = -A[i, indicies] rhs = -b[i, 0] inequality_constraints.append((indicies, coefficients, rhs)) return optimize_acqf( acq_function=acq_function, bounds=free_feature_bounds, q=q, num_restarts=optimizer_options.get("num_restarts", 60), raw_samples=optimizer_options.get("raw_samples", 1024), options={ "batch_limit": optimizer_options.get("batch_limit", 8), "maxiter": optimizer_options.get("maxiter", 200), "nonnegative": optimizer_options.get("nonnegative", False), "method": optimizer_options.get("method", "L-BFGS-B"), }, inequality_constraints=inequality_constraints, fixed_features=None, # handled inside the acquisition function post_processing_func=post_processing_func, batch_initial_conditions=batch_initial_conditions, return_best_only=True, sequential=sequential, )