Source code for pymoo.algorithms.soo.nonconvex.es

import math

import numpy as np

from pymoo.algorithms.base.genetic import GeneticAlgorithm
from pymoo.algorithms.soo.nonconvex.ga import FitnessSurvival
from pymoo.core.population import Population
from pymoo.docs import parse_doc_string
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.termination.default import DefaultSingleObjectiveTermination
from pymoo.util import default_random_state
from pymoo.util.display.single import SingleObjectiveOutput
from pymoo.util.optimum import filter_optimum


[docs] class ES(GeneticAlgorithm): def __init__(self, n_offsprings=None, pop_size=None, rule=1.0 / 7.0, phi=1.0, gamma=0.85, sampling=FloatRandomSampling(), survival=FitnessSurvival(), output=SingleObjectiveOutput(), **kwargs): """ Evolutionary Strategy (ES) Parameters ---------- n_offsprings : int The number of individuals created in each iteration. pop_size : int The number of individuals which are surviving from the offspring population (non-elitist) rule : float The rule (ratio) of individuals surviving. This automatically either calculated `n_offsprings` or `pop_size`. phi : float Expected rate of convergence (usually 1.0). gamma : float If not `None`, some individuals are created using the differentials with this as a length scale. sampling : object The sampling method for creating the initial population. """ if n_offsprings is None and pop_size is None: n_offsprings = 200 pop_size = int(math.ceil(n_offsprings * rule)) elif n_offsprings is not None and pop_size is None: pop_size = int(math.ceil(n_offsprings * rule)) elif n_offsprings is None and pop_size is not None: n_offsprings = int(math.floor(pop_size / rule)) assert pop_size is not None and n_offsprings is not None, "You have to at least provivde pop_size of n_offsprings." assert n_offsprings >= 2 * pop_size, "The number of offsprings should be at least double the population size." super().__init__(pop_size=pop_size, n_offsprings=n_offsprings, sampling=sampling, survival=survival, output=output, advance_after_initial_infill=True, eliminate_duplicates=False, **kwargs) self.termination = DefaultSingleObjectiveTermination() self.phi = phi self.gamma = gamma self.tau, self.taup, self.sigma_max = None, None, None def _setup(self, problem, **kwargs): n = problem.n_var self.taup = self.phi / ((2 * n) ** 0.5) self.tau = self.phi / ((2 * (n ** 0.5)) ** 0.5) xl, xu = self.problem.bounds() self.sigma_max = (xu - xl) / (self.problem.n_var ** 0.5) def _initialize_advance(self, infills=None, **kwargs): super()._initialize_advance(infills=infills, **kwargs) # initialize all individuals with the maximum sigma value infills.set("sigma", [self.sigma_max] * len(infills)) def _infill(self): pop, mu, _lambda = self.pop, self.pop_size, self.n_offsprings xl, xu = self.problem.bounds() X, sigma = pop.get("X", "sigma") # cycle through the elites individuals for create the solutions I = np.arange(_lambda) % mu # transform X and sigma to the shape of number of offsprings X, sigma = X[I], sigma[I] # get the sigma only of the elites to be used sigmap = es_intermediate_recomb(sigma, random_state=self.random_state) # calculate the new sigma based on tau and tau prime sigmap = np.minimum(self.sigma_max, es_sigma(sigmap, self.tau, self.taup, random_state=self.random_state)) # execute the evolutionary strategy to calculate the offspring solutions Xp = X + sigmap * self.random_state.normal(size=sigmap.shape) # if gamma is not none do the differential variation overwrite Xp and sigmap for the first mu-1 individuals if self.gamma is not None: Xp[:mu - 1] = X[:mu - 1] + self.gamma * (X[0] - X[1:mu]) sigmap[:mu - 1] = sigma[:mu - 1] # if we have bounds to consider -> repair the individuals which are out of bounds if self.problem.has_bounds(): Xp = es_mut_repair(Xp, X, sigmap, xl, xu, 10, random_state=self.random_state) # create the population to proceed further off = Population.new(X=Xp, sigma=sigmap) return off def _advance(self, infills=None, **kwargs): # if not all solutions suggested by infill() are evaluated we create a more semi (mu+lambda) algorithm if len(infills) < self.pop_size: infills = Population.merge(infills, self.pop) self.pop = self.survival.do(self.problem, infills, n_survive=self.pop_size) def _set_optimum(self): pop = self.pop if self.opt is None else Population.merge(self.opt, self.pop) self.opt = filter_optimum(pop, least_infeasible=True)
@default_random_state def es_sigma(sigma, tau, taup, random_state=None): _lambda, _n = sigma.shape return sigma * np.exp(taup * random_state.normal(size=(_lambda, 1)) + tau * random_state.normal(size=(_lambda, _n))) @default_random_state def es_intermediate_recomb(sigma, random_state=None): _lambda, _n = sigma.shape sigma_hat = np.zeros_like(sigma) for i in range(_lambda): for j in range(_n): k = random_state.integers(_lambda) sigma_hat[i, j] = (sigma[i, j] + sigma[k, j]) / 2.0 return sigma_hat @default_random_state def es_mut_repair(Xp, X, sigma, xl, xu, n_trials, random_state=None): # reshape xl and xu to be the same shape as the input XL = xl[None, :].repeat(len(Xp), axis=0) XU = xu[None, :].repeat(len(Xp), axis=0) all_in_bounds = False # for the given number of trials for k in range(n_trials): # find all indices which are out of bounds i, j = np.where(np.logical_or(Xp < XL, Xp > XU)) if len(i) == 0: all_in_bounds = True break else: # do the mutation again vectored for all values not in bound Xp[i, j] = X[i, j] + sigma[i, j] * random_state.normal(size=len(i)) # if there are still solutions which boundaries are violated, set them to the original X if not all_in_bounds: i, j = np.where(np.logical_or(Xp < XL, Xp > XU)) Xp[i, j] = X[i, j] return Xp @default_random_state def es_mut_loop(X, sigmap, xl, xu, n_trials=10, random_state=None): _lambda, _n = sigmap.shape # X prime which will be returned by the algorithm (set the default value to the same as parent) Xp = np.zeros_like(sigmap) # for each of the new offsprings for i in range(_lambda): # for each variable of it for j in range(_n): # by default just copy the value if no value is in bounds this will stay Xp[i, j] = X[i, j] # try to set the value a few time and be done if in bounds for _ in range(n_trials): # calculate the mutated value x = X[i, j] + sigmap[i, j] * random_state.normal() # if it is inside the bounds accept it - otherwise try again if xl[j] <= x <= xu[j]: Xp[i, j] = x break return Xp parse_doc_string(ES.__init__)