EvoX中的自定义算法和问题#

在本章节中,我们将介绍如何在EvoX中实现您自己的算法和问题。

算法和问题的布局#

在大多数传统的进化计算(EC)库中,算法通常在内部调用目标函数,其布局如下:

Algorithm
|
+--Problem

但在 EvoX 中,我们使用了扁平化布局:

Algorithm.step -- Problem.evaluate

这种布局使得算法和问题更加通用:一个算法可以优化不同的问题,而一个问题也可以适用于许多算法。

Algorithm 类#

Algorithm 类继承自 ModuleBase

总共有5个方法(其中2个方法是可选的)需要我们实现:

方法

签名

使用情况

__init__

(self, ...)

初始化算法实例,例如,种群大小(在迭代过程中保持不变)、超参数(只能通过HPO问题包装器设置或在此初始化)、和/或可变张量(可以动态修改)。

setup(可选)

(self, ...) -> self

初始化算法的可变子模块。请参阅ModuleBase。通常,不需要重写此方法。

步骤

`(self)

执行算法的正常优化迭代步骤。

init_step(可选)

`(self)

执行算法优化的第一步。如果此方法未被覆盖,将调用 step 方法。

备注

静态初始化仍然可以在__init__中编写,而可变子模块的初始化则不能。因此,如果重写的setup方法首先调用ModuleBasesetup(),则可以多次调用setup进行重复初始化。

如果在ModuleBase中的这种setup方法不适合你的算法,你可以在创建自己的算法类时重写setup方法。

问题类#

Problem 类也继承自 ModuleBase

但是,Problem 类非常简单。除了 __init__ 方法,唯一必须的方法是 evaluate 方法。

方法

签名

使用情况

__init__

(self, ...)

为问题初始化设置。

评估

`(self, pop: torch.Tensor) -> torch.Tensor

评估给定种群的适应度。

然而,在重写的方法中,evaluatepop 参数的类型可以更改为其他与 JIT 兼容的类型。

样例#

这里我们给出一个实现一个解决Sphere问题的PSO算法的例子

示例的伪代码#

以下是一段伪代码:

Set hyper-parameters

Generate the initial population
Do
    Compute fitness
    
    Update the local best fitness and the global best fitness
    Update the velocity
    Update the population
    
Until stopping criterion

以下是算法和问题中每个部分在EvoX中的对应关系。

Set hyper-parameters # Algorithm.__init__

Generate the initial population # Algorithm.setup
Do
    # Problem.evaluate (not part of the algorithm)
    Compute fitness
    
    # Algorithm.step
    Update the local best fitness and the global best fitness
    Update the velocity
    Update the population

Until stopping criterion

算法示例:PSO 算法#

粒子群优化(PSO)是一种基于种群的元启发式算法,灵感来自鸟类和鱼类的社会行为。它被广泛用于解决连续和离散优化问题。

以下是EvoX中PSO算法的实现示例:

import torch
from typing import List

from evox.utils import clamp
from evox.core import Parameter, Mutable, Algorithm, jit_class

@jit_class
class PSO(Algorithm):
    #Initialize the PSO algorithm with the given parameters.
    def __init__(
        self,
        pop_size: int,
        lb: torch.Tensor,
        ub: torch.Tensor,
        w: float = 0.6,
        phi_p: float = 2.5,
        phi_g: float = 0.8,
        device: torch.device | None = None,
    ):
        super().__init__()
        assert lb.shape == ub.shape and lb.ndim == 1 and ub.ndim == 1 and lb.dtype == ub.dtype
        self.pop_size = pop_size
        self.dim = lb.shape[0]
        # Here, Parameter is used to indicate that these values are hyper-parameters
        # so that they can be correctly traced and vector-mapped
        self.w = Parameter(w, device=device)
        self.phi_p = Parameter(phi_p, device=device)
        self.phi_g = Parameter(phi_g, device=device)
        lb = lb[None, :].to(device=device)
        ub = ub[None, :].to(device=device)
        length = ub - lb
        population = torch.rand(self.pop_size, self.dim, device=device)
        population = length * population + lb
        velocity = torch.rand(self.pop_size, self.dim, device=device)
        velocity = 2 * length * velocity - length
        self.lb = lb
        self.ub = ub
        # Mutable parameters
        self.population = Mutable(population)
        self.velocity = Mutable(velocity)
        self.local_best_location = Mutable(population)
        self.local_best_fitness = Mutable(torch.empty(self.pop_size, device=device).fill_(torch.inf))
        self.global_best_location = Mutable(population[0])
        self.global_best_fitness = Mutable(torch.tensor(torch.inf, device=device))

    def step(self):
        # Compute fitness
        fitness = self.evaluate(self.population)
        
        # Update the local best fitness and the global best fitness
        compare = self.local_best_fitness - fitness
        self.local_best_location = torch.where(
            compare[:, None] > 0, self.population, self.local_best_location
        )
        self.local_best_fitness = self.local_best_fitness - torch.relu(compare)
        self.global_best_location, self.global_best_fitness = self._min_by(
            [self.global_best_location.unsqueeze(0), self.population],
            [self.global_best_fitness.unsqueeze(0), fitness],
        )
        
        # Update the velocity
        rg = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
        rp = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
        velocity = (
            self.w * self.velocity
            + self.phi_p * rp * (self.local_best_location - self.population)
            + self.phi_g * rg * (self.global_best_location - self.population)
        )
        
        # Update the population
        population = self.population + velocity
        self.population = clamp(population, self.lb, self.ub)
        self.velocity = clamp(velocity, self.lb, self.ub)
        
    def _min_by(self, values: List[torch.Tensor],keys: List[torch.Tensor],):
        # Find the value with the minimum key
        values = torch.cat(values, dim=0)
        keys = torch.cat(keys, dim=0)
        min_index = torch.argmin(keys)
        return values[min_index], keys[min_index]

问题示例:Sphere 问题#

Sphere问题是一个简单但基本的基准优化问题,用于测试优化算法。

The Sphere function is defined as:

\[::\]

以下是EvoX中Sphere问题的实现示例:

import torch

from evox.core import Problem

class Sphere(Problem):
    def __init__(self):
        super().__init__()

    def evaluate(self, pop: torch.Tensor):
        return (pop**2).sum(-1)

现在,您可以启动一个工作流并运行它。