Custom Algorithm and Problem#

In this notebook, we will show how to use the Algorithm and Problem to create a custom algorithm and problem. Here we will give an example of implementing a PSO algorithm that solves the Sphere problem.

from typing import List

import torch

from evox.core import Algorithm, Mutable, Parameter, Problem, jit_class
from evox.utils import clamp
from evox.workflows import EvalMonitor, StdWorkflow

Algorithm example: PSO algorithm#

Particle Swarm Optimization (PSO) is a population-based metaheuristic algorithm inspired by the social behavior of birds and fish. It is widely used for solving continuous and discrete optimization problems.

Here is an implementation example of PSO algorithm in EvoX:

@jit_class
class PSO(Algorithm):
    # Initialize the PSO algorithm with the given parameters.
    def __init__(
        self,
        pop_size: int,
        lb: torch.Tensor,
        ub: torch.Tensor,
        w: float = 0.6,
        phi_p: float = 2.5,
        phi_g: float = 0.8,
        device: torch.device | None = None,
    ):
        super().__init__()
        assert lb.shape == ub.shape and lb.ndim == 1 and ub.ndim == 1 and lb.dtype == ub.dtype
        self.pop_size = pop_size
        self.dim = lb.shape[0]
        # Here, Parameter is used to indicate that these values are hyper-parameters
        # so that they can be correctly traced and vector-mapped
        self.w = Parameter(w, device=device)
        self.phi_p = Parameter(phi_p, device=device)
        self.phi_g = Parameter(phi_g, device=device)
        lb = lb[None, :].to(device=device)
        ub = ub[None, :].to(device=device)
        length = ub - lb
        population = torch.rand(self.pop_size, self.dim, device=device)
        population = length * population + lb
        velocity = torch.rand(self.pop_size, self.dim, device=device)
        velocity = 2 * length * velocity - length
        self.lb = lb
        self.ub = ub
        # Mutable parameters
        self.population = Mutable(population)
        self.velocity = Mutable(velocity)
        self.local_best_location = Mutable(population)
        self.local_best_fitness = Mutable(torch.empty(self.pop_size, device=device).fill_(torch.inf))
        self.global_best_location = Mutable(population[0])
        self.global_best_fitness = Mutable(torch.tensor(torch.inf, device=device))

    def step(self):
        # Compute fitness
        fitness = self.evaluate(self.population)

        # Update the local best fitness and the global best fitness
        compare = self.local_best_fitness - fitness
        self.local_best_location = torch.where(compare[:, None] > 0, self.population, self.local_best_location)
        self.local_best_fitness = self.local_best_fitness - torch.relu(compare)
        self.global_best_location, self.global_best_fitness = self._min_by(
            [self.global_best_location.unsqueeze(0), self.population],
            [self.global_best_fitness.unsqueeze(0), fitness],
        )

        # Update the velocity
        rg = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
        rp = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
        velocity = (
            self.w * self.velocity
            + self.phi_p * rp * (self.local_best_location - self.population)
            + self.phi_g * rg * (self.global_best_location - self.population)
        )

        # Update the population
        population = self.population + velocity
        self.population = clamp(population, self.lb, self.ub)
        self.velocity = clamp(velocity, self.lb, self.ub)

    def _min_by(
        self,
        values: List[torch.Tensor],
        keys: List[torch.Tensor],
    ):
        # Find the value with the minimum key
        values = torch.cat(values, dim=0)
        keys = torch.cat(keys, dim=0)
        min_index = torch.argmin(keys)
        return values[min_index], keys[min_index]

Problem example: Sphere problem#

The Sphere problem is a simple, yet fundamental benchmark optimization problem used to test optimization algorithms.

The Sphere function is defined as:

\[ \min f(x)= \sum_{i=1}^{n} x_{i}^{2} \]

Here is an implementation example of Sphere problem in EvoX:

class Sphere(Problem):
    def __init__(self):
        super().__init__()

    def evaluate(self, pop: torch.Tensor):
        return (pop**2).sum(-1)

Use the algorithm to solve the problem#

Initiate the algorithm, problem and monitor#

algorithm = PSO(
    pop_size=100,
    lb=torch.tensor([-10.0]),
    ub=torch.tensor([10.0]),
    w=0.6,
    phi_p=2.5,
    phi_g=0.8,
)
problem = Sphere()
monitor = EvalMonitor()

Initiate the workflow and run it#

workflow = StdWorkflow()
workflow.setup(algorithm=algorithm, problem=problem, monitor=monitor)

for _ in range(100):
    workflow.step()
workflow.get_submodule("monitor").plot()