Particle Swarm Optimization in Python
Jan 16, 2023 14 min read Updated on May 28, 2023
In this post, we'll implement particle swarm optimization (PSO) in Python. Here's a good definition of PSO from Wikipedia:
In computational science, particle swarm optimization (PSO) is a computational method that optimizes a problem by iteratively trying to improve a candidate solution with regard to a given measure of quality. It solves a problem by having a population of candidate solutions, here dubbed particles, and moving these particles around in the search-space according to simple mathematical formula over the particle's position and velocity. Each particle's movement is influenced by its local best known position, but is also guided toward the best known positions in the search-space, which are updated as better positions are found by other particles. This is expected to move the swarm toward the best solutions.
The starting particles are randomly placed in the solution space.
def generate_swarm(x_0, n_par):
# dimensions (number of variables)
dimensions = len(x_0)
swarm = []
# generate particles
for i in range(0, n_par):
position = []
# best position
position_best = -1
# particle velocity
velocity = []
# particle error (cost)
error = -1
# best error (cost)
error_best = error
# position and velocity
for i in range(0, dimensions):
position.append(x_0[i])
velocity.append(random.uniform(-1, 1))
# append particle
swarm.append({
"dimensions": dimensions,
"position": position,
"position_best": position_best,
"velocity": velocity,
"error": error,
"error_best": error_best
})
return swarm
Basically, this function generates a swarm of particles. It takes two arguments, x_0
, which is the starting position of the swarm (initial guess), and n_par
, which is the number of particles in the swarm.
The velocity of each particle is updated based on its current position, local best position that the particle has encountered so far, and global best position that has been encountered by other particles.
def update_velocity(velocity, position, position_best, global_pos):
# random bias
r_1 = random.random()
r_2 = random.random()
# update velocity
velocity_cognative = c_1 * r_1 * (position_best - position)
velocity_social = c_2 * r_2 * (global_pos - position)
velocity = weight * velocity + velocity_cognative + velocity_social
return velocity
The variables r_1
and r_2
are used to introduce randomness into the movement, which is useful to prevent getting stuck in local optima.
The configuration constants weight
, c_1
, and c_2
, represent constant inertia weight, cognitive constant, and social constant, which are used to influence the relative movement of particles in the solution space.
# constant inertia weight
weight = 0.5
# cognative constant
c_1 = 1
# social constant
c_2 = 2
The weight
constant is used to control balance between current velocity and previous velocity, c_1
is used to control influence of local best position on movement (higher value is more likely to move towards local best), c_2
is used to control influence of global best position on movement (higher value is more likely to move towards global best).
Nothing fancy here, the new position is updated by adding the velocity to the current position.
def update_position(position, velocity):
position = position + velocity
return position
The local and global best is updated if the current position gives a better error (cost) than the previous best.
The below function moves the swarm of particles through one iteration. It takes four arguments, f
, which is the cost function to be minimized, swarm
, which is the list of particles, bounds
, which adds constraints the search space, and global_best
and global_pos
, which are the best error and position found so far by all particles.
def iterate_swarm(f, swarm, bounds=None, global_best=-1, global_pos=-1):
# iterate particles and evaluate cost function
for j in range(0, len(swarm)):
dimensions = swarm[j]["dimensions"]
position = swarm[j]["position"]
error_best = swarm[j]["error_best"]
# evaluate new error (cost)
error = swarm[j]["error"] = f(position)
# update local best position if current position gives better local error
if (error < error_best or error_best == -1):
swarm[j]["position_best"] = position
swarm[j]["error_best"] = error
position_best = swarm[j]["position_best"]
velocity = swarm[j]["velocity"]
# update global best if position of current particle gives best global error
if (error < global_best or global_best == -1):
global_pos = list(position)
global_best = float(error)
# update particle velocity and position
for i in range(0, dimensions):
velocity[i] = update_velocity(
velocity[i],
position[i],
position_best[i],
global_pos[i]
)
position[i] = update_position(
position[i],
velocity[i]
)
# check bounds
if bounds:
# max value for position
if (position[i] > bounds[i][1]):
position[i] = bounds[i][1]
# min value for position
if (position[i] < bounds[i][0]):
position[i] = bounds[i][0]
# return
return swarm, round(global_best, 2), [round(pos, 2) for pos in global_pos]
Basically, this function iterates over each particle in the swarm, updates local best position and error if better than previous best, and updates the global best position and error if the particle's position yields a better result than the current global best.
The velocity and position are updated using the update_velocity
and update_position
functions defined earlier. If bounds are specified, the position is checked to ensure it is within the bounds. Finally, the function returns the updated swarm, the global best error, and the global best position.
In the below examples, the maximum number of iterations is set to 50
, which should be enough in our case, and random seed is set to 42
(consistent result given same configuration variables).
MAX_ITERATIONS = 50
random.seed(1234)
In this example, the cost function is x[0] ** 5 - 3 * x[0] ** 4 + 5
, where x
-range is [0, 4]
(note that x: [x_1, x_2, ..., x_n]
represent x
, y
, z
, and so on).
# minimize x^5 - 3x^4 + 5 over [0, 4]
def f(x): return x[0] ** 5 - 3 * x[0] ** 4 + 5
# reset global
global_best = -1
global_pos = -1
# initial swarm
swarm = generate_swarm(x_0=[5], n_par=15)
# iterate swarm
for i in range(MAX_ITERATIONS):
swarm, global_best, global_pos = iterate_swarm(
f,
swarm,
bounds=[(0, 4)],
global_best=global_best,
global_pos=global_pos
)
assert (global_best, global_pos) == (-14.91, [2.39])
In this example, the cost function is -(5 + 3 * x[0] - 4 * x[1] - x[0] ** 2 + x[0] * x[1] - x[1] ** 2)
with no bounds.
# minimize -(5 + 3x - 4y - x^2 + x y - y^2)
def f(x): return -(5 + 3 * x[0] - 4 * x[1] - x[0] ** 2 + x[0] * x[1] - x[1] ** 2)
# reset global
global_best = -1
global_pos = -1
# initial swarm
swarm = generate_swarm(x_0=[5, 5], n_par=15)
# iterate swarm
for i in range(MAX_ITERATIONS):
swarm, global_best, global_pos = iterate_swarm(
f,
swarm,
global_best=global_best,
global_pos=global_pos
)
assert (global_best, global_pos) == (-9.33, [0.67, -1.67])
Updated on May 27, 2023 Changelog