部署具有自定义算法的HPO

部署具有自定义算法的HPO#

在本章中,我们将专注于使用自定义算法部署HPO,强调细节而不是整体工作流。上一章Efficient HPO with EvoX中提供了HPO部署的简要介绍,强烈建议事先阅读。

使算法可并行化#

由于我们需要将内部算法转化为问题,确保内部算法是可并行化的至关重要。因此,可能需要对算法进行一些修改。

为了确保函数是 JIT 可编译的,它必须满足 JIT components 中列出的条件。除了这些要求外,算法还必须满足以下两个约束:

  1. 算法不应有对其自身属性进行就地操作的方法。

class ExampleAlgorithm(Algorithm):
    def __init__(self,...): 
        self.pop = torch.rand(10,10) #attribute of the algorithm itself
        pass

    def step_in_place(self): # method with in-place operations
        self.pop.copy_(pop)
        pass

    def step_out_of_place(self): # method without in-place operations
        self.pop = pop
        pass
  1. 代码逻辑不依赖于 python 控制流。

class ExampleAlgorithm(Algorithm):
    def __init__(self,...): 
        self.pop = rand(10,10) #attribute of the algotirhm itself
        pass

    def plus(self, y):
        self.pop += y
        pass

    def minus(self, y):
        self.pop -= y
        pass      

    def step_with_python_control_flow(self, y): # function with python control flow
        x = rand()
        if x>0.5:
            self.plus(y)
        else:
            self.minus(y)
        pass

    def step_without_python_control_flow(self, y): # function without python control flow
        x = rand()
        cond = x > 0.5
        _if_else_ = TracingCond(self.plus, self.minus)
        _if_else_.cond(cond,y)
        self.pop = pop
        pass

在 EvoX 中,我们可以通过 @trace_impl 装饰器轻松地使算法并行化。

此装饰器的参数是一个不可并行化的函数,装饰后的函数是对原始函数的重写。关于@trace_impl的详细介绍可以在JIT Components中找到。

在这种机制下,我们可以保留原始函数以便在 HPO 任务之外使用,同时在 HPO 任务中实现高效计算。此外,这种修改非常方便。

利用 HPOMonitor#

在HPO任务中,我们应该使用HPOMonitor来跟踪每个内部算法的指标。与标准的monitor相比,HPOMonitor仅增加了一个方法,tell_fitness。此添加旨在提供更大的灵活性来评估指标,因为HPO任务通常涉及多维和复杂的指标。

用户只需创建一个HPOMonitor的子类,并重写tell_fitness方法以定义自定义评估指标。

我们还提供了一个简单的HPOFitnessMonitor,支持计算多目标问题的“IGD”和“HV”指标,以及单目标问题的最小值。

一个简单的示例#

在这里,我们将演示如何使用 EvoX 进行 HPO 的简单示例。我们将使用 PSO 算法来搜索基本算法的最佳超参数,以解决球体问题。

首先,让我们导入必要的模块。

import torch

from evox.algorithms.pso_variants.pso import PSO
from evox.core import Algorithm, Mutable, Parameter, Problem, jit_class, trace_impl
from evox.problems.hpo_wrapper import HPOFitnessMonitor, HPOProblemWrapper
from evox.utils import TracingCond
from evox.workflows import EvalMonitor, StdWorkflow

接下来,我们定义一个简单的球体问题。请注意,这与常见的problems没有区别。

@jit_class
class Sphere(Problem):
    def __init__(self):
        super().__init__()

    def evaluate(self, x: torch.Tensor):
        return (x * x).sum(-1)

接下来,我们定义算法。原始的step函数是不可并行化的,因此我们使用@trace_impl装饰器重写它以实现并行化。具体来说,我们修改了就地操作并调整了Python控制流。

@jit_class
class ExampleAlgorithm(Algorithm):
    def __init__(self, pop_size: int, lb: torch.Tensor, ub: torch.Tensor):
        super().__init__()
        assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}"
        assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}"
        self.pop_size = pop_size
        self.hp = Parameter([1.0, 2.0, 3.0, 4.0])  # the hyperparameters to be optimized
        self.lb = lb
        self.ub = ub
        self.dim = lb.shape[0]
        self.pop = Mutable(torch.empty(self.pop_size, lb.shape[0], dtype=lb.dtype, device=lb.device))
        self.fit = Mutable(torch.empty(self.pop_size, dtype=lb.dtype, device=lb.device))

    def strategy_1(self, pop):  # one update strategy
        pop = pop * (self.hp[0] + self.hp[1])
        self.pop = pop

    def strategy_2(self, pop):  #  the other update strategy
        pop = pop * (self.hp[2] + self.hp[3])
        self.pop = pop

    def step(self):
        pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)  # simply random sampling
        pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
        control_number = torch.rand()
        if control_number < 0.5:  # conditional control
            pop = self.strategy_1(pop)
        else:
            pop = self.strategy_2(pop)
        self.pop.copy_(pop)  # in-place update
        self.fit.copy_(self.evaluate(pop))

    # (using class methods for control flow)
    @trace_impl(step)  # rewrite the step function to support vmap
    def trace_step(self):
        pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
        pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
        pop = pop * self.hp[0]
        control_number = torch.rand()
        cond = control_number < 0.5
        # Deal with the conditional control flow equivalent in tracing
        branches = (self.strategy_1, self.strategy_2)
        state, names = self.prepare_control_flow(*branches)
        _if_else_ = TracingCond(*branches)
        state = _if_else_.cond(state, cond, pop)
        self.after_control_flow(state, *names)
        # Evaluate
        self.fit = self.evaluate(pop)

要处理 Python 控制流,我们使用 TracingCondTracingWhileTracingSwitch。由于在追踪模式中,方法外的变量可能会被错误地解释为静态变量,我们需要使用状态来跟踪它们。关于 TracingCondTracingWhileTracingSwitch 的详细介绍可以在 JIT Components 中找到。下面,我们提供了 trace_step 方法的两个等效实现。

# Equivalent to the following code (Local function style)

    @trace_impl(step)  # rewrite the step function to support vmap
    def trace_step(self):
        pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
        pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
        pop = pop * self.hp[0]
        control_number = torch.rand()
        cond = control_number < 0.5
        # Deal with the conditional control flow equivalent in tracing
        branches = (lambda: pop * self.hp[1], lambda: pop * self.hp[2])
        state, names = self.prepare_control_flow(*branches)
        _if_else_ = TracingCond(*branches, stateful_functions=True)
        state, pop = _if_else_.cond(state, cond)
        self.after_control_flow(state, *names)
        # Evaluate
        self.pop = pop
        self.fit = self.evaluate(pop)


# Equivalent to the following code (Pure function style)

    @trace_impl(step)  # rewrite the step function to support vmap
    def trace_step(self):
        pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)
        pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
        pop = pop * self.hp[0]
        control_number = torch.rand()
        cond = control_number < 0.5
        # Deal with the conditional control flow equivalent in tracing
        branches = (lambda p, hp: p * hp[1], lambda p, hp: p * hp[2])
        _if_else_ = TracingCond(*branches, stateful_functions=False) # defaults to False for no member function
        pop = _if_else_.cond(cond, pop, self.hp)
        # Evaluate
        self.pop = pop
        self.fit = self.evaluate(pop)

接下来,我们可以使用 StdWorkflow 来包装 problemalgorithmmonitor。然后我们使用 HPOProblemWrapperStdWorkflow 转换为 HPO 问题。

torch.set_default_device("cuda" if torch.cuda.is_available() else "cpu")
inner_algo = ExampleAlgorithm(10, -10 * torch.ones(8), 10 * torch.ones(8))
inner_prob = Sphere()
inner_monitor = HPOFitnessMonitor()
inner_monitor.setup()
inner_workflow = StdWorkflow()
inner_workflow.setup(inner_algo, inner_prob, monitor=inner_monitor)
# Transform the inner workflow to an HPO problem
hpo_prob = HPOProblemWrapper(iterations=9, num_instances=7, workflow=inner_workflow, copy_init_state=True)

我们可以测试 HPOProblemWrapper 是否正确识别了我们定义的超参数。由于我们没有对7个实例的超参数进行修改,它们在所有实例中应该是相同的。

params = hpo_prob.get_init_params()
print("init params:\n", params)
init params:
 {'self.algorithm.hp': Parameter containing:
tensor([[1., 2., 3., 4.],
        [1., 2., 3., 4.],
        [1., 2., 3., 4.],
        [1., 2., 3., 4.],
        [1., 2., 3., 4.],
        [1., 2., 3., 4.],
        [1., 2., 3., 4.]], device='cuda:0')}

我们也可以指定自己的一组超参数值。请注意,超参数集的数量必须与HPOProblemWrapper中的实例数量相匹配。自定义超参数应作为字典提供,其值被包装在Parameter中。

params = hpo_prob.get_init_params()
# since we have 7 instances, we need to pass 7 sets of hyperparameters
params["self.algorithm.hp"] = torch.nn.Parameter(torch.rand(7, 4), requires_grad=False)
result = hpo_prob.evaluate(params)
print("params:\n", params, "\n")
print("result:\n", result)
params:
 {'self.algorithm.hp': Parameter containing:
tensor([[0.8108, 0.7703, 0.8577, 0.0708],
        [0.3465, 0.7551, 0.0136, 0.5634],
        [0.9978, 0.8935, 0.7606, 0.9789],
        [0.9837, 0.4787, 0.5919, 0.2196],
        [0.9336, 0.8979, 0.8039, 0.0677],
        [0.7770, 0.4149, 0.8965, 0.6570],
        [0.1422, 0.5341, 0.6108, 0.5978]], device='cuda:0')} 

result:
 tensor([77.0704, 15.8463, 21.6154, 40.8018, 43.6397, 55.0446,  2.4755],
       device='cuda:0')

现在,我们使用PSO算法来优化ExampleAlgorithm的超参数。请注意,PSO的种群大小必须与实例数量匹配,否则可能会发生意外错误。在这种情况下,我们需要在外部工作流中转换解决方案,因为HPOProblemWrapper需要一个字典作为输入。

class solution_transform(torch.nn.Module):
    def forward(self, x: torch.Tensor):
        return {"self.algorithm.hp": x}


outer_algo = PSO(7, -3 * torch.ones(4), 3 * torch.ones(4))
monitor = EvalMonitor(full_sol_history=False)
outer_workflow = StdWorkflow()
outer_workflow.setup(outer_algo, hpo_prob, monitor=monitor, solution_transform=solution_transform())
outer_workflow.init_step()
for _ in range(20):
    outer_workflow.step()
monitor = outer_workflow.get_submodule("monitor")
print("params:\n", monitor.topk_solutions, "\n")
print("result:\n", monitor.topk_fitness)
params:
 tensor([[0.0031, 0.4910, 1.8519, 1.2221]], device='cuda:0') 

result:
 tensor([0.0012], device='cuda:0')