Deploy HPO with Custom Algorithms#
In this chapter, we will focus on deploying HPO with custom algorithms, emphasizing the details rather than the overall workflow. A brief introduction to HPO deployment is provided in the previous chapter, Efficient HPO with EvoX, and prior reading is highly recommended.
Making Algorithms Parallelizable#
Since we need to transform the inner algorithm into the problem, it’s crucial that the inner algorithm is parallelizable. Therefore, some modifications to the algorithm may be necessary.
To ensure the function is JIT-compilable, it must meet the conditions outlined in JIT components. In addition to these requirements, the algorithm must also satisfy the following two constraints:
The algorithm should have no methods with in-place operations on the attributes of the algorithm itself.
class ExampleAlgorithm(Algorithm):
def __init__(self,...):
self.pop = torch.rand(10,10) #attribute of the algorithm itself
pass
def step_in_place(self): # method with in-place operations
self.pop.copy_(pop)
pass
def step_out_of_place(self): # method without in-place operations
self.pop = pop
pass
The code logic does not rely on python control flow.
class ExampleAlgorithm(Algorithm):
def __init__(self,...):
self.pop = rand(10,10) #attribute of the algotirhm itself
pass
def plus(self, y):
self.pop += y
pass
def minus(self, y):
self.pop -= y
pass
def step_with_python_control_flow(self, y): # function with python control flow
x = rand()
if x>0.5:
self.plus(y)
else:
self.minus(y)
pass
def step_without_python_control_flow(self, y): # function without python control flow
x = rand()
cond = x > 0.5
_if_else_ = TracingCond(self.plus, self.minus)
_if_else_.cond(cond,y)
self.pop = pop
pass
In EvoX, we can easily make the algorithm parallelizable by the @trace_impl
decorator.
The parameter of this decorator is a non-parallelizable function, and the decorated function is a rewrite of the original function. Detailed introduction of @trace_impl
can be found in JIT Components.
Under this mechanism, we can retain the original function for use outside HPO tasks while enabling efficient computation within HPO tasks. Moreover, this modification is highly convenient.
Utilizing the HPOMonitor#
In the HPO task, we should use the HPOMonitor
to track the metrics of each inner algorithm. The HPOMonitor
adds only one method, tell_fitness
, compared to the standard monitor
. This addition is designed to offer greater flexibility in evaluating metrics, as HPO tasks often involve multi-dimensional and complex metrics.
Users only need to create a subclass of HPOMonitor
and override the tell_fitness
method to define custom evaluation metrics.
We also provide a simple HPOFitnessMonitor
, which supports calculating the ‘IGD’ and ‘HV’ metrics for multi-objective problems, and the minimum value for single-objective problems.
A simple example#
Here, we’ll demonstrate a simple example of how to use HPO with EvoX. We will use the PSO
algorithm to search for the optimal hyper-parameters of a basic algorithm to solve the sphere problem.
First, let’s import the necessary modules.
import torch
from evox.algorithms.pso_variants.pso import PSO
from evox.core import Algorithm, Mutable, Parameter, Problem, jit_class, trace_impl
from evox.problems.hpo_wrapper import HPOFitnessMonitor, HPOProblemWrapper
from evox.utils import TracingCond
from evox.workflows import EvalMonitor, StdWorkflow
Next, we define an simple sphere problem. Note that this has no difference from the common problems
.
@jit_class
class Sphere(Problem):
def __init__(self):
super().__init__()
def evaluate(self, x: torch.Tensor):
return (x * x).sum(-1)
Next, we define the algorithm. The original step
function is non-parallelizable, so we rewrite it using the @trace_impl
decorator to make it parallelizable. Specifically, we modify in-place operations and adjust the Python control flow.
@jit_class
class ExampleAlgorithm(Algorithm):
def __init__(self, pop_size: int, lb: torch.Tensor, ub: torch.Tensor):
super().__init__()
assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}"
assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}"
self.pop_size = pop_size
self.hp = Parameter([1.0, 2.0, 3.0, 4.0]) # the hyperparameters to be optimized
self.lb = lb
self.ub = ub
self.dim = lb.shape[0]
self.pop = Mutable(torch.empty(self.pop_size, lb.shape[0], dtype=lb.dtype, device=lb.device))
self.fit = Mutable(torch.empty(self.pop_size, dtype=lb.dtype, device=lb.device))
def strategy_1(self, pop): # one update strategy
pop = pop * (self.hp[0] + self.hp[1])
self.pop = pop
def strategy_2(self, pop): # the other update strategy
pop = pop * (self.hp[2] + self.hp[3])
self.pop = pop
def step(self):
pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device) # simply random sampling
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
control_number = torch.rand()
if control_number < 0.5: # conditional control
pop = self.strategy_1(pop)
else:
pop = self.strategy_2(pop)
self.pop.copy_(pop) # in-place update
self.fit.copy_(self.evaluate(pop))
# (using class methods for control flow)
@trace_impl(step) # rewrite the step function to support vmap
def trace_step(self):
pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
control_number = torch.rand()
cond = control_number < 0.5
# Deal with the conditional control flow equivalent in tracing
branches = (self.strategy_1, self.strategy_2)
state, names = self.prepare_control_flow(*branches)
_if_else_ = TracingCond(*branches)
state = _if_else_.cond(state, cond, pop)
self.after_control_flow(state, *names)
# Evaluate
self.fit = self.evaluate(pop)
To handle the Python control flow, we use TracingCond
, TracingWhile
and TracingSwitch
. Since, in tracing mode, variables outside the method may be incorrectly interpreted as static variables, we need to use state to track them. A detailed introduction to TracingCond
, TracingWhile
and TracingSwitch
can be found in JIT Components. Below, we provide two equivalent implementations for the trace_step
method.
# Equivalent to the following code (Local function style)
@trace_impl(step) # rewrite the step function to support vmap
def trace_step(self):
pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
control_number = torch.rand()
cond = control_number < 0.5
# Deal with the conditional control flow equivalent in tracing
branches = (lambda: pop * self.hp[1], lambda: pop * self.hp[2])
state, names = self.prepare_control_flow(*branches)
_if_else_ = TracingCond(*branches, stateful_functions=True)
state, pop = _if_else_.cond(state, cond)
self.after_control_flow(state, *names)
# Evaluate
self.pop = pop
self.fit = self.evaluate(pop)
# Equivalent to the following code (Pure function style)
@trace_impl(step) # rewrite the step function to support vmap
def trace_step(self):
pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
control_number = torch.rand()
cond = control_number < 0.5
# Deal with the conditional control flow equivalent in tracing
branches = (lambda p, hp: p * hp[1], lambda p, hp: p * hp[2])
_if_else_ = TracingCond(*branches, stateful_functions=False) # defaults to False for no member function
pop = _if_else_.cond(cond, pop, self.hp)
# Evaluate
self.pop = pop
self.fit = self.evaluate(pop)
Next, we can use the StdWorkflow
to wrap the problem
, algorithm
and monitor
. Then we use the HPOProblemWrapper
to transform the StdWorkflow
to HPO problem.
torch.set_default_device("cuda" if torch.cuda.is_available() else "cpu")
inner_algo = ExampleAlgorithm(10, -10 * torch.ones(8), 10 * torch.ones(8))
inner_prob = Sphere()
inner_monitor = HPOFitnessMonitor()
inner_monitor.setup()
inner_workflow = StdWorkflow()
inner_workflow.setup(inner_algo, inner_prob, monitor=inner_monitor)
# Transform the inner workflow to an HPO problem
hpo_prob = HPOProblemWrapper(iterations=9, num_instances=7, workflow=inner_workflow, copy_init_state=True)
We can test whether the HPOProblemWrapper
correctly recognizes the hyper-parameters we defined. Since we have made no modifications to the hyper-parameters for the 7 instances, they should be identical across all instances.
params = hpo_prob.get_init_params()
print("init params:\n", params)
init params:
{'self.algorithm.hp': Parameter containing:
tensor([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]], device='cuda:0')}
We can also specify our own set of hyperparameter values. Note that the number of hyperparameter sets must match the number of instances in the HPOProblemWrapper
. The custom hyper-parameters should be provided as a dictionary whose values are wrapped in the Parameter
.
params = hpo_prob.get_init_params()
# since we have 7 instances, we need to pass 7 sets of hyperparameters
params["self.algorithm.hp"] = torch.nn.Parameter(torch.rand(7, 4), requires_grad=False)
result = hpo_prob.evaluate(params)
print("params:\n", params, "\n")
print("result:\n", result)
params:
{'self.algorithm.hp': Parameter containing:
tensor([[0.8108, 0.7703, 0.8577, 0.0708],
[0.3465, 0.7551, 0.0136, 0.5634],
[0.9978, 0.8935, 0.7606, 0.9789],
[0.9837, 0.4787, 0.5919, 0.2196],
[0.9336, 0.8979, 0.8039, 0.0677],
[0.7770, 0.4149, 0.8965, 0.6570],
[0.1422, 0.5341, 0.6108, 0.5978]], device='cuda:0')}
result:
tensor([77.0704, 15.8463, 21.6154, 40.8018, 43.6397, 55.0446, 2.4755],
device='cuda:0')
Now, we use the PSO
algorithm to optimize the hyper-parameters of ExampleAlgorithm
. Note that the population size of the PSO
must match the number of instances; otherwise, unexpected errors may occur. In this case, we need to transform the solution in the outer workflow, as the HPOProblemWrapper
requires a dictionary as input.
class solution_transform(torch.nn.Module):
def forward(self, x: torch.Tensor):
return {"self.algorithm.hp": x}
outer_algo = PSO(7, -3 * torch.ones(4), 3 * torch.ones(4))
monitor = EvalMonitor(full_sol_history=False)
outer_workflow = StdWorkflow()
outer_workflow.setup(outer_algo, hpo_prob, monitor=monitor, solution_transform=solution_transform())
outer_workflow.init_step()
for _ in range(20):
outer_workflow.step()
monitor = outer_workflow.get_submodule("monitor")
print("params:\n", monitor.topk_solutions, "\n")
print("result:\n", monitor.topk_fitness)
params:
tensor([[0.0031, 0.4910, 1.8519, 1.2221]], device='cuda:0')
result:
tensor([0.0012], device='cuda:0')