Custom Algorithm and Problem#
In this notebook, we will show how to use the Algorithm
and Problem
to create a custom algorithm and problem. Here we will give an example of implementing a PSO algorithm that solves the Sphere problem.
from typing import List
import torch
from evox.core import Algorithm, Mutable, Parameter, Problem, jit_class
from evox.utils import clamp
from evox.workflows import EvalMonitor, StdWorkflow
Algorithm example: PSO algorithm#
Particle Swarm Optimization (PSO) is a population-based metaheuristic algorithm inspired by the social behavior of birds and fish. It is widely used for solving continuous and discrete optimization problems.
Here is an implementation example of PSO algorithm in EvoX:
@jit_class
class PSO(Algorithm):
# Initialize the PSO algorithm with the given parameters.
def __init__(
self,
pop_size: int,
lb: torch.Tensor,
ub: torch.Tensor,
w: float = 0.6,
phi_p: float = 2.5,
phi_g: float = 0.8,
device: torch.device | None = None,
):
super().__init__()
assert lb.shape == ub.shape and lb.ndim == 1 and ub.ndim == 1 and lb.dtype == ub.dtype
self.pop_size = pop_size
self.dim = lb.shape[0]
# Here, Parameter is used to indicate that these values are hyper-parameters
# so that they can be correctly traced and vector-mapped
self.w = Parameter(w, device=device)
self.phi_p = Parameter(phi_p, device=device)
self.phi_g = Parameter(phi_g, device=device)
lb = lb[None, :].to(device=device)
ub = ub[None, :].to(device=device)
length = ub - lb
population = torch.rand(self.pop_size, self.dim, device=device)
population = length * population + lb
velocity = torch.rand(self.pop_size, self.dim, device=device)
velocity = 2 * length * velocity - length
self.lb = lb
self.ub = ub
# Mutable parameters
self.population = Mutable(population)
self.velocity = Mutable(velocity)
self.local_best_location = Mutable(population)
self.local_best_fitness = Mutable(torch.empty(self.pop_size, device=device).fill_(torch.inf))
self.global_best_location = Mutable(population[0])
self.global_best_fitness = Mutable(torch.tensor(torch.inf, device=device))
def step(self):
# Compute fitness
fitness = self.evaluate(self.population)
# Update the local best fitness and the global best fitness
compare = self.local_best_fitness - fitness
self.local_best_location = torch.where(compare[:, None] > 0, self.population, self.local_best_location)
self.local_best_fitness = self.local_best_fitness - torch.relu(compare)
self.global_best_location, self.global_best_fitness = self._min_by(
[self.global_best_location.unsqueeze(0), self.population],
[self.global_best_fitness.unsqueeze(0), fitness],
)
# Update the velocity
rg = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
rp = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
velocity = (
self.w * self.velocity
+ self.phi_p * rp * (self.local_best_location - self.population)
+ self.phi_g * rg * (self.global_best_location - self.population)
)
# Update the population
population = self.population + velocity
self.population = clamp(population, self.lb, self.ub)
self.velocity = clamp(velocity, self.lb, self.ub)
def _min_by(
self,
values: List[torch.Tensor],
keys: List[torch.Tensor],
):
# Find the value with the minimum key
values = torch.cat(values, dim=0)
keys = torch.cat(keys, dim=0)
min_index = torch.argmin(keys)
return values[min_index], keys[min_index]
Problem example: Sphere problem#
The Sphere problem is a simple, yet fundamental benchmark optimization problem used to test optimization algorithms.
The Sphere function is defined as:
\[
\min f(x)= \sum_{i=1}^{n} x_{i}^{2}
\]
Here is an implementation example of Sphere problem in EvoX:
class Sphere(Problem):
def __init__(self):
super().__init__()
def evaluate(self, pop: torch.Tensor):
return (pop**2).sum(-1)
Use the algorithm to solve the problem#
Initiate the algorithm, problem and monitor#
algorithm = PSO(
pop_size=100,
lb=torch.tensor([-10.0]),
ub=torch.tensor([10.0]),
w=0.6,
phi_p=2.5,
phi_g=0.8,
)
problem = Sphere()
monitor = EvalMonitor()
Initiate the workflow and run it#
workflow = StdWorkflow()
workflow.setup(algorithm=algorithm, problem=problem, monitor=monitor)
for _ in range(100):
workflow.step()
workflow.get_submodule("monitor").plot()