部署具有自定义算法的HPO#
在本章中,我们将专注于使用自定义算法部署HPO,强调细节而不是整体工作流。上一章Efficient HPO with EvoX中提供了HPO部署的简要介绍,强烈建议事先阅读。
使算法可并行化#
由于我们需要将内部算法转化为问题,确保内部算法是可并行化的至关重要。因此,可能需要对算法进行一些修改。
为了确保函数是 JIT 可编译的,它必须满足 JIT components 中列出的条件。除了这些要求外,算法还必须满足以下两个约束:
算法不应有对其自身属性进行就地操作的方法。
class ExampleAlgorithm(Algorithm):
def __init__(self,...):
self.pop = torch.rand(10,10) #attribute of the algorithm itself
pass
def step_in_place(self): # method with in-place operations
self.pop.copy_(pop)
pass
def step_out_of_place(self): # method without in-place operations
self.pop = pop
pass
代码逻辑不依赖于 python 控制流。
class ExampleAlgorithm(Algorithm):
def __init__(self,...):
self.pop = rand(10,10) #attribute of the algotirhm itself
pass
def plus(self, y):
self.pop += y
pass
def minus(self, y):
self.pop -= y
pass
def step_with_python_control_flow(self, y): # function with python control flow
x = rand()
if x>0.5:
self.plus(y)
else:
self.minus(y)
pass
def step_without_python_control_flow(self, y): # function without python control flow
x = rand()
cond = x > 0.5
_if_else_ = TracingCond(self.plus, self.minus)
_if_else_.cond(cond,y)
self.pop = pop
pass
在 EvoX 中,我们可以通过 @trace_impl
装饰器轻松地使算法并行化。
此装饰器的参数是一个不可并行化的函数,装饰后的函数是对原始函数的重写。关于@trace_impl
的详细介绍可以在JIT Components中找到。
在这种机制下,我们可以保留原始函数以便在 HPO 任务之外使用,同时在 HPO 任务中实现高效计算。此外,这种修改非常方便。
利用 HPOMonitor#
在HPO任务中,我们应该使用HPOMonitor
来跟踪每个内部算法的指标。与标准的monitor
相比,HPOMonitor
仅增加了一个方法,tell_fitness
。此添加旨在提供更大的灵活性来评估指标,因为HPO任务通常涉及多维和复杂的指标。
用户只需创建一个HPOMonitor
的子类,并重写tell_fitness
方法以定义自定义评估指标。
我们还提供了一个简单的HPOFitnessMonitor
,支持计算多目标问题的“IGD”和“HV”指标,以及单目标问题的最小值。
一个简单的示例#
在这里,我们将演示如何使用 EvoX 进行 HPO 的简单示例。我们将使用 PSO
算法来搜索基本算法的最佳超参数,以解决球体问题。
首先,让我们导入必要的模块。
import torch
from evox.algorithms.pso_variants.pso import PSO
from evox.core import Algorithm, Mutable, Parameter, Problem, jit_class, trace_impl
from evox.problems.hpo_wrapper import HPOFitnessMonitor, HPOProblemWrapper
from evox.utils import TracingCond
from evox.workflows import EvalMonitor, StdWorkflow
接下来,我们定义一个简单的球体问题。请注意,这与常见的problems
没有区别。
@jit_class
class Sphere(Problem):
def __init__(self):
super().__init__()
def evaluate(self, x: torch.Tensor):
return (x * x).sum(-1)
接下来,我们定义算法。原始的step
函数是不可并行化的,因此我们使用@trace_impl
装饰器重写它以实现并行化。具体来说,我们修改了就地操作并调整了Python控制流。
@jit_class
class ExampleAlgorithm(Algorithm):
def __init__(self, pop_size: int, lb: torch.Tensor, ub: torch.Tensor):
super().__init__()
assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}"
assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}"
self.pop_size = pop_size
self.hp = Parameter([1.0, 2.0, 3.0, 4.0]) # the hyperparameters to be optimized
self.lb = lb
self.ub = ub
self.dim = lb.shape[0]
self.pop = Mutable(torch.empty(self.pop_size, lb.shape[0], dtype=lb.dtype, device=lb.device))
self.fit = Mutable(torch.empty(self.pop_size, dtype=lb.dtype, device=lb.device))
def strategy_1(self, pop): # one update strategy
pop = pop * (self.hp[0] + self.hp[1])
self.pop = pop
def strategy_2(self, pop): # the other update strategy
pop = pop * (self.hp[2] + self.hp[3])
self.pop = pop
def step(self):
pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device) # simply random sampling
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
control_number = torch.rand()
if control_number < 0.5: # conditional control
pop = self.strategy_1(pop)
else:
pop = self.strategy_2(pop)
self.pop.copy_(pop) # in-place update
self.fit.copy_(self.evaluate(pop))
# (using class methods for control flow)
@trace_impl(step) # rewrite the step function to support vmap
def trace_step(self):
pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
control_number = torch.rand()
cond = control_number < 0.5
# Deal with the conditional control flow equivalent in tracing
branches = (self.strategy_1, self.strategy_2)
state, names = self.prepare_control_flow(*branches)
_if_else_ = TracingCond(*branches)
state = _if_else_.cond(state, cond, pop)
self.after_control_flow(state, *names)
# Evaluate
self.fit = self.evaluate(pop)
要处理 Python 控制流,我们使用 TracingCond
、TracingWhile
和 TracingSwitch
。由于在追踪模式中,方法外的变量可能会被错误地解释为静态变量,我们需要使用状态来跟踪它们。关于 TracingCond
、TracingWhile
和 TracingSwitch
的详细介绍可以在 JIT Components 中找到。下面,我们提供了 trace_step
方法的两个等效实现。
# Equivalent to the following code (Local function style)
@trace_impl(step) # rewrite the step function to support vmap
def trace_step(self):
pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
control_number = torch.rand()
cond = control_number < 0.5
# Deal with the conditional control flow equivalent in tracing
branches = (lambda: pop * self.hp[1], lambda: pop * self.hp[2])
state, names = self.prepare_control_flow(*branches)
_if_else_ = TracingCond(*branches, stateful_functions=True)
state, pop = _if_else_.cond(state, cond)
self.after_control_flow(state, *names)
# Evaluate
self.pop = pop
self.fit = self.evaluate(pop)
# Equivalent to the following code (Pure function style)
@trace_impl(step) # rewrite the step function to support vmap
def trace_step(self):
pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
control_number = torch.rand()
cond = control_number < 0.5
# Deal with the conditional control flow equivalent in tracing
branches = (lambda p, hp: p * hp[1], lambda p, hp: p * hp[2])
_if_else_ = TracingCond(*branches, stateful_functions=False) # defaults to False for no member function
pop = _if_else_.cond(cond, pop, self.hp)
# Evaluate
self.pop = pop
self.fit = self.evaluate(pop)
接下来,我们可以使用 StdWorkflow
来包装 problem
、algorithm
和 monitor
。然后我们使用 HPOProblemWrapper
将 StdWorkflow
转换为 HPO 问题。
torch.set_default_device("cuda" if torch.cuda.is_available() else "cpu")
inner_algo = ExampleAlgorithm(10, -10 * torch.ones(8), 10 * torch.ones(8))
inner_prob = Sphere()
inner_monitor = HPOFitnessMonitor()
inner_monitor.setup()
inner_workflow = StdWorkflow()
inner_workflow.setup(inner_algo, inner_prob, monitor=inner_monitor)
# Transform the inner workflow to an HPO problem
hpo_prob = HPOProblemWrapper(iterations=9, num_instances=7, workflow=inner_workflow, copy_init_state=True)
我们可以测试 HPOProblemWrapper
是否正确识别了我们定义的超参数。由于我们没有对7个实例的超参数进行修改,它们在所有实例中应该是相同的。
params = hpo_prob.get_init_params()
print("init params:\n", params)
init params:
{'self.algorithm.hp': Parameter containing:
tensor([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]], device='cuda:0')}
我们也可以指定自己的一组超参数值。请注意,超参数集的数量必须与HPOProblemWrapper
中的实例数量相匹配。自定义超参数应作为字典提供,其值被包装在Parameter
中。
params = hpo_prob.get_init_params()
# since we have 7 instances, we need to pass 7 sets of hyperparameters
params["self.algorithm.hp"] = torch.nn.Parameter(torch.rand(7, 4), requires_grad=False)
result = hpo_prob.evaluate(params)
print("params:\n", params, "\n")
print("result:\n", result)
params:
{'self.algorithm.hp': Parameter containing:
tensor([[0.8108, 0.7703, 0.8577, 0.0708],
[0.3465, 0.7551, 0.0136, 0.5634],
[0.9978, 0.8935, 0.7606, 0.9789],
[0.9837, 0.4787, 0.5919, 0.2196],
[0.9336, 0.8979, 0.8039, 0.0677],
[0.7770, 0.4149, 0.8965, 0.6570],
[0.1422, 0.5341, 0.6108, 0.5978]], device='cuda:0')}
result:
tensor([77.0704, 15.8463, 21.6154, 40.8018, 43.6397, 55.0446, 2.4755],
device='cuda:0')
现在,我们使用PSO
算法来优化ExampleAlgorithm
的超参数。请注意,PSO
的种群大小必须与实例数量匹配,否则可能会发生意外错误。在这种情况下,我们需要在外部工作流中转换解决方案,因为HPOProblemWrapper
需要一个字典作为输入。
class solution_transform(torch.nn.Module):
def forward(self, x: torch.Tensor):
return {"self.algorithm.hp": x}
outer_algo = PSO(7, -3 * torch.ones(4), 3 * torch.ones(4))
monitor = EvalMonitor(full_sol_history=False)
outer_workflow = StdWorkflow()
outer_workflow.setup(outer_algo, hpo_prob, monitor=monitor, solution_transform=solution_transform())
outer_workflow.init_step()
for _ in range(20):
outer_workflow.step()
monitor = outer_workflow.get_submodule("monitor")
print("params:\n", monitor.topk_solutions, "\n")
print("result:\n", monitor.topk_fitness)
params:
tensor([[0.0031, 0.4910, 1.8519, 1.2221]], device='cuda:0')
result:
tensor([0.0012], device='cuda:0')