#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""Core methods for building closures in torch and interfacing with numpy."""
from __future__ import annotations
from collections.abc import Sequence
from functools import partial
from typing import Any, Callable, Optional
import torch
from botorch.optim.utils import (
_handle_numerical_errors,
get_tensors_as_ndarray_1d,
set_tensors_from_ndarray_1d,
)
from botorch.optim.utils.numpy_utils import as_ndarray
from botorch.utils.context_managers import zero_grad_ctx
from numpy import float64 as np_float64, full as np_full, ndarray, zeros as np_zeros
from torch import Tensor
[docs]
class ForwardBackwardClosure:
r"""Wrapper for fused forward and backward closures."""
def __init__(
self,
forward: Callable[[], Tensor],
parameters: dict[str, Tensor],
backward: Callable[[Tensor], None] = Tensor.backward,
reducer: Optional[Callable[[Tensor], Tensor]] = torch.sum,
callback: Optional[Callable[[Tensor, Sequence[Optional[Tensor]]], None]] = None,
context_manager: Callable = None, # pyre-ignore [9]
) -> None:
r"""Initializes a ForwardBackwardClosure instance.
Args:
closure: Callable that returns a tensor.
parameters: A dictionary of tensors whose `grad` fields are to be returned.
backward: Callable that takes the (reduced) output of `forward` and sets the
`grad` attributes of tensors in `parameters`.
reducer: Optional callable used to reduce the output of the forward pass.
callback: Optional callable that takes the reduced output of `forward` and
the gradients of `parameters` as positional arguments.
context_manager: A ContextManager used to wrap each forward-backward call.
When passed as `None`, `context_manager` defaults to a `zero_grad_ctx`
that zeroes the gradients of `parameters` upon entry.
"""
if context_manager is None:
context_manager = partial(zero_grad_ctx, parameters)
self.forward = forward
self.backward = backward
self.parameters = parameters
self.reducer = reducer
self.callback = callback
self.context_manager = context_manager
def __call__(self, **kwargs: Any) -> tuple[Tensor, tuple[Optional[Tensor], ...]]:
with self.context_manager():
values = self.forward(**kwargs)
value = values if self.reducer is None else self.reducer(values)
self.backward(value)
grads = tuple(param.grad for param in self.parameters.values())
if self.callback:
self.callback(value, grads)
return value, grads
[docs]
class NdarrayOptimizationClosure:
r"""Adds stateful behavior and a numpy.ndarray-typed API to a closure with an
expected return type Tuple[Tensor, Union[Tensor, Sequence[Optional[Tensor]]]]."""
def __init__(
self,
closure: Callable[[], tuple[Tensor, Sequence[Optional[Tensor]]]],
parameters: dict[str, Tensor],
as_array: Callable[[Tensor], ndarray] = None, # pyre-ignore [9]
as_tensor: Callable[[ndarray], Tensor] = torch.as_tensor,
get_state: Callable[[], ndarray] = None, # pyre-ignore [9]
set_state: Callable[[ndarray], None] = None, # pyre-ignore [9]
fill_value: float = 0.0,
persistent: bool = True,
) -> None:
r"""Initializes a NdarrayOptimizationClosure instance.
Args:
closure: A ForwardBackwardClosure instance.
parameters: A dictionary of tensors representing the closure's state.
Expected to correspond with the first `len(parameters)` optional
gradient tensors returned by `closure`.
as_array: Callable used to convert tensors to ndarrays.
as_tensor: Callable used to convert ndarrays to tensors.
get_state: Callable that returns the closure's state as an ndarray. When
passed as `None`, defaults to calling `get_tensors_as_ndarray_1d`
on `closure.parameters` while passing `as_array` (if given by the user).
set_state: Callable that takes a 1-dimensional ndarray and sets the
closure's state. When passed as `None`, `set_state` defaults to
calling `set_tensors_from_ndarray_1d` with `closure.parameters` and
a given ndarray while passing `as_tensor`.
fill_value: Fill value for parameters whose gradients are None. In most
cases, `fill_value` should either be zero or NaN.
persistent: Boolean specifying whether an ndarray should be retained
as a persistent buffer for gradients.
"""
if get_state is None:
# Note: Numpy supports copying data between ndarrays with different dtypes.
# Hence, our default behavior need not coerce the ndarray representations
# of tensors in `parameters` to float64 when copying over data.
_as_array = as_ndarray if as_array is None else as_array
get_state = partial(
get_tensors_as_ndarray_1d,
tensors=parameters,
dtype=np_float64,
as_array=_as_array,
)
if as_array is None: # per the note, do this after resolving `get_state`
as_array = partial(as_ndarray, dtype=np_float64)
if set_state is None:
set_state = partial(
set_tensors_from_ndarray_1d, parameters, as_tensor=as_tensor
)
self.closure = closure
self.parameters = parameters
self.as_array = as_ndarray
self.as_tensor = as_tensor
self._get_state = get_state
self._set_state = set_state
self.fill_value = fill_value
self.persistent = persistent
self._gradient_ndarray: Optional[ndarray] = None
def __call__(
self, state: Optional[ndarray] = None, **kwargs: Any
) -> tuple[ndarray, ndarray]:
if state is not None:
self.state = state
try:
value_tensor, grad_tensors = self.closure(**kwargs)
value = self.as_array(value_tensor)
grads = self._get_gradient_ndarray(fill_value=self.fill_value)
index = 0
for param, grad in zip(self.parameters.values(), grad_tensors):
size = param.numel()
if grad is not None:
grads[index : index + size] = self.as_array(grad.view(-1))
index += size
except RuntimeError as e:
value, grads = _handle_numerical_errors(e, x=self.state, dtype=np_float64)
return value, grads
@property
def state(self) -> ndarray:
return self._get_state()
@state.setter
def state(self, state: ndarray) -> None:
self._set_state(state)
def _get_gradient_ndarray(self, fill_value: Optional[float] = None) -> ndarray:
if self.persistent and self._gradient_ndarray is not None:
if fill_value is not None:
self._gradient_ndarray.fill(fill_value)
return self._gradient_ndarray
size = sum(param.numel() for param in self.parameters.values())
array = (
np_zeros(size, dtype=np_float64)
if fill_value is None or fill_value == 0.0
else np_full(size, fill_value, dtype=np_float64)
)
if self.persistent:
self._gradient_ndarray = array
return array