Source code for pymoo.algorithms.moo.rvea

import numpy as np

from pymoo.algorithms.base.genetic import GeneticAlgorithm
from pymoo.core.survival import Survival
from pymoo.docs import parse_doc_string
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.operators.selection.rnd import RandomSelection
from pymoo.termination.max_eval import MaximumFunctionCallTermination
from pymoo.termination.max_gen import MaximumGenerationTermination
from pymoo.util.display.multi import MultiObjectiveOutput
from pymoo.util.misc import has_feasible, vectorized_cdist


[docs] class RVEA(GeneticAlgorithm): def __init__(self, ref_dirs, alpha=2.0, adapt_freq=0.1, pop_size=None, sampling=FloatRandomSampling(), selection=RandomSelection(), crossover=SBX(eta=30, prob=1.0), mutation=PM(eta=20), eliminate_duplicates=True, n_offsprings=None, output=MultiObjectiveOutput(), **kwargs): """ Parameters ---------- ref_dirs : {ref_dirs} adapt_freq : float Defines the ratio of generation when the reference directions are updated. pop_size : int (default = None) By default the population size is set to None which means that it will be equal to the number of reference line. However, if desired this can be overwritten by providing a positive number. sampling : {sampling} selection : {selection} crossover : {crossover} mutation : {mutation} eliminate_duplicates : {eliminate_duplicates} n_offsprings : {n_offsprings} """ # set reference directions and pop_size self.ref_dirs = ref_dirs if self.ref_dirs is not None: if pop_size is None: pop_size = len(self.ref_dirs) # the fraction of n_max_gen when the the reference directions are adapted self.adapt_freq = adapt_freq # you can override the survival if necessary survival = kwargs.pop("survival", None) if survival is None: survival = APDSurvival(ref_dirs, alpha=alpha) super().__init__(pop_size=pop_size, sampling=sampling, selection=selection, crossover=crossover, mutation=mutation, survival=survival, eliminate_duplicates=eliminate_duplicates, n_offsprings=n_offsprings, output=output, **kwargs) def _setup(self, problem, **kwargs): # if maximum functions termination convert it to generations if isinstance(self.termination, MaximumFunctionCallTermination): n_gen = np.ceil((self.termination.n_max_evals - self.pop_size) / self.n_offsprings) self.termination = MaximumGenerationTermination(n_gen) # check whether the n_gen termination is used - otherwise this algorithm can be not run if not isinstance(self.termination, MaximumGenerationTermination): raise Exception("Please use the n_gen or n_eval as a termination criterion to run RVEA!") def _advance(self, **kwargs): super()._advance(**kwargs) # get the current generation and maximum of generations n_gen, n_max_gen = self.n_gen, self.termination.n_max_gen # each i-th generation (define by fr and n_max_gen) the reference directions are updated if self.adapt_freq is not None and n_gen % np.ceil(n_max_gen * self.adapt_freq) == 0: self.survival.adapt() def _set_optimum(self, **kwargs): if not has_feasible(self.pop): self.opt = self.pop[[np.argmin(self.pop.get("CV"))]] else: self.opt = self.pop
# --------------------------------------------------------------------------------------------------------- # Survival Selection # --------------------------------------------------------------------------------------------------------- def calc_gamma(V): gamma = np.arccos((- np.sort(-1 * V @ V.T))[:, 1]) gamma = np.maximum(gamma, 1e-64) return gamma def calc_V(ref_dirs): return ref_dirs / np.linalg.norm(ref_dirs, axis=1)[:, None] class APDSurvival(Survival): def __init__(self, ref_dirs, alpha=2.0) -> None: super().__init__(filter_infeasible=True) n_dim = ref_dirs.shape[1] self.alpha = alpha self.niches = None self.V, self.gamma = None, None self.ideal, self.nadir = np.full(n_dim, np.inf), None self.ref_dirs = ref_dirs self.extreme_ref_dirs = np.where(np.any(vectorized_cdist(self.ref_dirs, np.eye(n_dim)) == 0, axis=1))[0] self.V = calc_V(self.ref_dirs) self.gamma = calc_gamma(self.V) def adapt(self): if self.nadir is not None: self.V = calc_V(calc_V(self.ref_dirs) * (self.nadir - self.ideal)) self.gamma = calc_gamma(self.V) def _do(self, problem, pop, n_survive, algorithm=None, n_gen=None, n_max_gen=None, **kwargs): if n_gen is None: n_gen = algorithm.n_gen - 1 if n_max_gen is None: n_max_gen = algorithm.termination.n_max_gen # get the objective space values F = pop.get("F") # store the ideal and nadir point estimation for adapt - (and ideal for transformation) self.ideal = np.minimum(F.min(axis=0), self.ideal) # translate the population to make the ideal point the origin F = F - self.ideal # the distance to the ideal point dist_to_ideal = np.linalg.norm(F, axis=1) dist_to_ideal[dist_to_ideal < 1e-64] = 1e-64 # normalize by distance to ideal F_prime = F / dist_to_ideal[:, None] # calculate for each solution the acute angles to ref dirs acute_angle = np.arccos(F_prime @ self.V.T) niches = acute_angle.argmin(axis=1) # assign to each reference direction the solution niches_to_ind = [[] for _ in range(len(self.V))] for k, i in enumerate(niches): niches_to_ind[i].append(k) # all individuals which will be surviving survivors = [] # for each reference direction for k in range(len(self.V)): # individuals assigned to the niche assigned_to_niche = niches_to_ind[k] # if niche not empty if len(assigned_to_niche) > 0: # the angle of niche to nearest neighboring niche gamma = self.gamma[k] # the angle from the individuals of this niches to the niche itself theta = acute_angle[assigned_to_niche, k] # the penalty which is applied for the metric M = problem.n_obj if problem.n_obj > 2.0 else 1.0 penalty = M * ((n_gen / n_max_gen) ** self.alpha) * (theta / gamma) # calculate the angle-penalized penalized (APD) apd = dist_to_ideal[assigned_to_niche] * (1 + penalty) # the individual which survives survivor = assigned_to_niche[apd.argmin()] # set attributes to the individual pop[assigned_to_niche].set(theta=theta, apd=apd, niche=k, opt=False) pop[survivor].set("opt", True) # select the one with smallest APD value survivors.append(survivor) ret = pop[survivors] self.niches = niches_to_ind self.nadir = ret.get("F").max(axis=0) return ret parse_doc_string(RVEA.__init__)