"""Created on Sep 30, 2013 modified on March 3-7, 2020.
@authors: J. Akeret, S. Birrer, A. Shajib
"""
from copy import copy
from math import floor
import math
import numpy as np
from tqdm import tqdm
__all__ = ["ParticleSwarmOptimizer"]
[docs]
class ParticleSwarmOptimizer(object):
"""Optimizer using a swarm of particles.
:param func:
A function that takes a vector in the parameter space as input and
returns the natural logarithm of the posterior probability for that
position.
:param low: array of the lower bound of the parameter space
:param high: array of the upper bound of the parameter space
:param particle_count: the number of particles to use.
:param pool: (optional)
An alternative method of using the parallelized algorithm. If
provided, the value of ``threads`` is ignored and the
object provided by ``pool`` is used for all parallelization. It
can be any object with a ``map`` method that follows the same
calling sequence as the built-in ``map`` function.
"""
[docs]
def __init__(
self, func, low, high, particle_count=25, pool=None, args=None, kwargs=None
):
"""
:param func: function to call to return log likelihood
:type func: python definition
:param low: lower bound of the parameters
:type low: numpy array
:param high: upper bound of the parameters
:type high: numpy array
:param particle_count: number of particles in each iteration of the PSO
:type particle_count: int
:param pool: MPI pool for mapping different processes
:type pool: None or MPI pool
:param args: positional arguments to send to `func`. The function
will be called as `func(x, *args, **kwargs)`.
:type args: `list`
:param kwargs: keyword arguments to send to `func`. The function
will be called as `func(x, *args, **kwargs)`
:type kwargs: `dict`
"""
self.low = [l for l in low]
self.high = [h for h in high]
self.particleCount = particle_count
self.pool = pool
self.param_count = len(self.low)
self.swarm = self._init_swarm()
self.global_best = Particle.create(self.param_count)
self.func = _FunctionWrapper(func, args, kwargs)
def __getstate__(self):
"""In order to be generally pickleable, we need to discard the pool object
before trying."""
d = self.__dict__
d["pool"] = None
return d
def __setstate__(self, state):
self.__dict__ = state
[docs]
def set_global_best(self, position, velocity, fitness):
"""Set the global best particle.
:param position: position of the new global best
:type position: `list` or `ndarray`
:param velocity: velocity of the new global best
:type velocity: `list` or `ndarray`
:param fitness: fitness of the new global best
:type fitness: `float`
:return: `None`
:rtype:
"""
self.global_best.position = [p for p in position]
self.global_best.velocity = [v for v in velocity]
self.global_best.fitness = fitness
def _init_swarm(self):
"""Initiate the swarm.
:return:
:rtype:
"""
swarm = []
for _ in range(self.particleCount):
swarm.append(
Particle(
np.random.uniform(self.low, self.high, size=self.param_count),
np.zeros(self.param_count),
)
)
return swarm
[docs]
def sample(
self,
max_iter=1000,
c1=1.193,
c2=1.193,
p=0.7,
m=1e-3,
n=1e-2,
early_stop_tolerance=None,
verbose=True,
):
"""Launches the PSO. Yields the complete swarm per iteration.
:param max_iter: maximum iterations
:param c1: cognitive weight
:param c2: social weight
:param p: stop criterion, percentage of particles to use
:param m: stop criterion, difference between mean fitness and global best
:param n: stop criterion, difference between norm of the particle vector and
norm of the global best
:param early_stop_tolerance: will terminate at the given value (should be
specified as a chi^2)
:param verbose: prints when it stopped
:type verbose: boolean
"""
self._get_fitness(self.swarm)
i = 0
while True:
for particle in self.swarm:
if self.global_best.fitness < particle.fitness:
self.global_best = particle.copy()
# if(self.isMaster()):
# print("new global best found %i %s"%(i,
# self.global_best.__str__()))
if particle.fitness > particle.personal_best.fitness:
particle.update_personal_best()
if i >= max_iter:
if self.is_master():
if verbose:
print("Max iteration reached! Stopping.")
return
if self._converged(i, p=p, m=m, n=n):
if self.is_master():
if verbose:
print("Converged after {} iterations!".format(i))
print(
"Best fit found: ",
self.global_best.fitness,
self.global_best.position,
)
return
if early_stop_tolerance is not None:
if self._acceptable_convergence(early_stop_tolerance):
return
for particle in self.swarm:
w = 0.5 + np.random.uniform(0, 1, size=self.param_count) / 2
# w=0.72
part_vel = w * np.array(particle.velocity)
cog_vel = (
c1
* np.random.uniform(0, 1, size=self.param_count)
* (
np.array(particle.personal_best.position)
- np.array(particle.position)
)
)
soc_vel = (
c2
* np.random.uniform(0, 1, size=self.param_count)
* (
np.array(self.global_best.position)
- np.array(particle.position)
)
)
particle.velocity = (part_vel + cog_vel + soc_vel).tolist()
particle.position = (
np.array(particle.position) + np.array(particle.velocity)
).tolist()
self._get_fitness(self.swarm)
swarm = []
for particle in self.swarm:
swarm.append(particle.copy())
yield swarm
i += 1
[docs]
def optimize(
self,
max_iter=1000,
verbose=True,
c1=1.193,
c2=1.193,
p=0.7,
m=1e-3,
n=1e-2,
early_stop_tolerance=None,
):
"""Run the optimization and return a full list of optimization outputs.
:param max_iter: maximum iterations
:param verbose: if `True`, print a message every 10 iterations
:param c1: cognitive weight
:param c2: social weight
:param p: stop criterion, percentage of particles to use
:param m: stop criterion, difference between mean fitness and global best
:param n: stop criterion, difference between norm of the particle
vector and norm of the global best
:param early_stop_tolerance: will terminate at the given value (should be specified as a chi^2)
"""
log_likelihood_list = []
vel_list = []
pos_list = []
num_iter = 0
if verbose:
disable_tqdm = False
else:
disable_tqdm = True
with tqdm(total=max_iter, disable=disable_tqdm) as pbar:
for _ in self.sample(
max_iter, c1, c2, p, m, n, early_stop_tolerance, verbose
):
log_likelihood_list.append(self.global_best.fitness)
vel_list.append(self.global_best.velocity)
pos_list.append(self.global_best.position)
num_iter += 1
if verbose and self.is_master():
pbar.update(1)
return self.global_best.position, [log_likelihood_list, pos_list, vel_list]
def _get_fitness(self, swarm):
"""Set fitness (probability) of the particles in swarm.
:param swarm: PSO state
:type swarm: list of Particle() instances of the swarm
:return:
:rtype:
"""
position = [particle.position for particle in swarm]
if self.pool is None:
map_func = map
else:
map_func = self.pool.map
ln_probability = list(map_func(self.func, position))
for i, particle in enumerate(swarm):
particle.fitness = ln_probability[i]
particle.position = position[i]
def _converged(self, it, p, m, n):
"""Check for convergence.
:param it:
:type it:
:param p:
:type p:
:param m:
:type m:
:param n:
:type n:
:return:
:rtype:
"""
# test = self._converged_space2(p=p)
# print(test)
fit = self._converged_fit(it=it, p=p, m=m)
if fit:
space = self._converged_space(it=it, p=p, m=n)
return space
else:
return False
def _converged_fit(self, it, p, m):
"""
:param it:
:type it:
:param p:
:type p:
:param m:
:type m:
:return:
:rtype:
"""
best_sort = np.sort(
[particle.personal_best.fitness for particle in self.swarm]
)[::-1]
mean_fit = np.mean(best_sort[1 : int(math.floor(self.particleCount * p))])
# print( "best %f, mean_fit %f, ration %f"%( self.global_best[0],
# mean_fit, abs((self.global_best[0]-mean_fit))))
return abs(self.global_best.fitness - mean_fit) < m
def _converged_space(self, it, p, m):
"""
:param it:
:type it:
:param p:
:type p:
:param m:
:type m:
:return:
:rtype:
"""
sorted_swarm = [particle for particle in self.swarm]
sorted_swarm.sort()
best_of_best = sorted_swarm[0 : int(floor(self.particleCount * p))]
diffs = []
for particle in best_of_best:
diffs.append(
np.array(self.global_best.position) - np.array(particle.position)
)
max_norm = max(list(map(np.linalg.norm, diffs)))
return abs(max_norm) < m
def _converged_space2(self, p):
"""
:param p:
:type p:
:return:
:rtype:
"""
# Andres N. Ruiz et al.
sorted_swarm = [particle for particle in self.swarm]
sorted_swarm.sort()
best_of_best = sorted_swarm[0 : int(floor(self.particleCount * p))]
positions = [particle.position for particle in best_of_best]
means = np.mean(positions, axis=0)
delta = np.mean(
(means - np.array(self.global_best.position))
/ np.array(self.global_best.position)
)
return np.log10(delta) < -3.0
[docs]
def is_master(self):
"""Check if the current processor is the master.
:return:
:rtype:
"""
if self.pool is None:
return True
else:
return self.pool.is_master()
def _acceptable_convergence(self, chi_square_tolerance):
chi_square = -2 * self.global_best.fitness
if np.min(chi_square) < chi_square_tolerance:
return True
else:
return False
class Particle(object):
"""Implementation of a single particle.
:param position: the position of the particle in the parameter space
:param velocity: the velocity of the particle
:param fitness: the current fitness of the particle
"""
def __init__(self, position, velocity, fitness=0):
"""
:param position: parameter positions
:param velocity: parameter velocity
:param fitness:
"""
self.position = [p for p in position]
self.velocity = [v for v in velocity]
self.fitness = fitness
self.param_count = len(self.position)
self._personal_best = None
@property
def personal_best(self):
"""
:return:
:rtype:
"""
if self._personal_best is None:
return self
else:
return self._personal_best
@classmethod
def create(cls, param_count):
"""Creates a new particle without position, velocity and -inf as fitness."""
return Particle(
np.array([[]] * param_count), np.array([[]] * param_count), -np.inf
)
def update_personal_best(self):
"""Sets the current particle representation as personal best."""
self._personal_best = self.copy()
def copy(self):
"""Creates a copy of itself."""
return Particle(copy(self.position), copy(self.velocity), self.fitness)
def __str__(self):
"""Get a `str` object for the particle state.
:return:
:rtype:
"""
return "{:f}, pos: {:s} velocity: {:s}".format(
self.fitness, self.position, self.velocity
)
def __lt__(self, other):
return self.fitness > other.fitness
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.__dict__ = state
def __unicode__(self):
return self.__str__()
class _FunctionWrapper(object):
"""This is a hack to make the likelihood function pickleable when ``args`` or
``kwargs`` are also included.
This hack is copied from
emcee: https://github.com/dfm/emcee/.
"""
def __init__(self, f, args, kwargs):
self.f = f
self.args = [] if args is None else args
self.kwargs = {} if kwargs is None else kwargs
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.__dict__ = state
def __call__(self, x):
try:
return self.f(x, *self.args, **self.kwargs)
except: # pragma: no cover
import traceback
print("PSO: Exception while calling your likelihood function:")
print(" params:", x)
print(" args:", self.args)
print(" kwargs:", self.kwargs)
print(" exception:")
traceback.print_exc()
raise