Source code for pymoo.algorithms.moo.ctaea

import math

import numpy as np
from scipy.spatial.distance import cdist, pdist, squareform

from pymoo.algorithms.base.genetic import GeneticAlgorithm
from pymoo.core.population import Population
from pymoo.decomposition.asf import ASF
from pymoo.docs import parse_doc_string
from pymoo.functions import load_function
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.operators.selection.tournament import TournamentSelection
from pymoo.util import default_random_state
from pymoo.util.display.multi import MultiObjectiveOutput
from pymoo.util.dominator import Dominator
from pymoo.util.misc import has_feasible, random_permutations
from pymoo.util.nds.non_dominated_sorting import NonDominatedSorting


# =========================================================================================================
# Implementation
# Following original code by K. Li https://cola-laboratory.github.io/codes/CTAEA.zip
# =========================================================================================================


@default_random_state
def comp_by_cv_dom_then_random(pop, P, random_state=None, **kwargs):
    S = np.full(P.shape[0], np.nan)

    for i in range(P.shape[0]):
        a, b = P[i, 0], P[i, 1]

        if pop[a].CV <= 0.0 and pop[b].CV <= 0.0:
            rel = Dominator.get_relation(pop[a].F, pop[b].F)
            if rel == 1:
                S[i] = a
            elif rel == -1:
                S[i] = b
            else:
                S[i] = random_state.choice([a, b])
        elif pop[a].CV <= 0.0:
            S[i] = a
        elif pop[b].CV <= 0.0:
            S[i] = b
        else:
            S[i] = random_state.choice([a, b])

    return S[:, None].astype(int)


class RestrictedMating(TournamentSelection):
    """Restricted mating approach to balance convergence and diversity archives"""

    @default_random_state
    def _do(self, problem, Hm, n_select, n_parents, random_state=None, **kwargs):
        n_pop = len(Hm) // 2

        _, rank = NonDominatedSorting().do(Hm.get('F'), return_rank=True)

        Pc = (rank[:n_pop] == 0).sum() / len(Hm)
        Pd = (rank[n_pop:] == 0).sum() / len(Hm)

        # number of random individuals needed
        n_random = n_select * n_parents * self.pressure
        n_perms = math.ceil(n_random / n_pop)
        # get random permutations and reshape them
        P = random_permutations(n_perms, n_pop, random_state=random_state)[:n_random]
        P = np.reshape(P, (n_select * n_parents, self.pressure))
        if Pc <= Pd:
            # Choose from DA
            P[::n_parents, :] += n_pop
        pf = random_state.random(n_select)
        P[1::n_parents, :][pf >= Pc] += n_pop

        # compare using tournament function
        S = self.func_comp(Hm, P, random_state=random_state, **kwargs)

        return np.reshape(S, (n_select, n_parents))


class CADASurvival:

    def __init__(self, ref_dirs):
        self.ref_dirs = ref_dirs
        self.opt = None
        self.ideal_point = np.full(ref_dirs.shape[1], np.inf)
        self._decomposition = ASF()
        self._calc_perpendicular_distance = load_function("calc_perpendicular_distance")

    @default_random_state
    def do(self, _, pop, da, n_survive=None, random_state=None, **kwargs):
        # Store random_state for use in methods
        self.random_state = random_state
        # Offspring are last of merged population
        off = pop[-n_survive:]
        # Update ideal point
        self.ideal_point = np.min(np.vstack((self.ideal_point, off.get("F"))), axis=0)
        # Update CA
        pop = self._updateCA(pop, n_survive, random_state=random_state)
        # Update DA
        Hd = Population.merge(da, off)
        da = self._updateDA(pop, Hd, n_survive)
        return pop, da

    def _associate(self, pop):
        """Associate each individual with a F vector and calculate decomposed fitness"""
        F = pop.get("F")
        dist_matrix = self._calc_perpendicular_distance(F - self.ideal_point, self.ref_dirs)
        niche_of_individuals = np.argmin(dist_matrix, axis=1)
        FV = self._decomposition.do(F, weights=self.ref_dirs[niche_of_individuals, :],
                                    ideal_point=self.ideal_point, weight_0=1e-4)
        pop.set("niche", niche_of_individuals)
        pop.set("FV", FV)
        return niche_of_individuals, FV

    @default_random_state
    def _updateCA(self, pop, n_survive, random_state=None):
        """Update the Convergence archive (CA)"""
        CV = pop.get("CV").flatten()

        Sc = pop[CV == 0]  # ConstraintsAsObjective population
        if len(Sc) == n_survive:  # Exactly n_survive feasible individuals
            F = Sc.get("F")
            fronts, rank = NonDominatedSorting().do(F, return_rank=True)
            Sc.set('rank', rank)
            self.opt = Sc[fronts[0]]
            return Sc
        elif len(Sc) < n_survive:  # Not enough feasible individuals
            remainder = n_survive - len(Sc)
            # Solve sub-problem CV, tche
            SI = pop[CV > 0]
            f1 = SI.get("CV")
            _, f2 = self._associate(SI)
            sub_F = np.column_stack([f1, f2])
            fronts = NonDominatedSorting().do(sub_F, n_stop_if_ranked=remainder)
            I = []
            for front in fronts:
                if len(I) + len(front) <= remainder:
                    I.extend(front)
                else:
                    n_missing = remainder - len(I)
                    last_front_CV = np.argsort(f1.flatten()[front])
                    I.extend(front[last_front_CV[:n_missing]])
            SI = SI[I]
            S = Population.merge(Sc, SI)
            F = S.get("F")
            fronts, rank = NonDominatedSorting().do(F, return_rank=True)
            S.set('rank', rank)
            self.opt = S[fronts[0]]
            return S
        else:  # Too many feasible individuals
            F = Sc.get("F")
            # Filter by non-dominated sorting
            fronts, rank = NonDominatedSorting().do(F, return_rank=True, n_stop_if_ranked=n_survive)
            I = np.concatenate(fronts)
            S, rank, F = Sc[I], rank[I], F[I]
            if len(S) > n_survive:
                # Remove individual in most crowded niche and with worst fitness
                niche_of_individuals, FV = self._associate(S)
                index, count = np.unique(niche_of_individuals, return_counts=True)
                survivors = np.full(S.shape[0], True)
                while survivors.sum() > n_survive:
                    crowdest_niches, = np.where(count == count.max())
                    worst_idx = None
                    worst_niche = None
                    worst_fit = -1
                    for crowdest_niche in crowdest_niches:
                        crowdest, = np.where((niche_of_individuals == index[crowdest_niche]) & survivors)
                        niche_worst = crowdest[FV[crowdest].argmax()]
                        dist_to_max_fit = cdist(F[[niche_worst], :], F).flatten()
                        dist_to_max_fit[niche_worst] = np.inf
                        dist_to_max_fit[~survivors] = np.inf
                        min_d_to_max_fit = dist_to_max_fit.min()

                        dist_in_niche = squareform(pdist(F[crowdest]))
                        np.fill_diagonal(dist_in_niche, np.inf)

                        delta_d = dist_in_niche - min_d_to_max_fit
                        min_d_i = np.unravel_index(np.argmin(delta_d, axis=None), dist_in_niche.shape)
                        if (delta_d[min_d_i] < 0) or (
                                delta_d[min_d_i] == 0 and (FV[crowdest[list(min_d_i)]] > niche_worst).any()):
                            min_d_i = list(min_d_i)
                            random_state.shuffle(min_d_i)
                            closest = crowdest[min_d_i]
                            niche_worst = closest[np.argmax(FV[closest])]
                        if (FV[niche_worst] > worst_fit).all():
                            worst_fit = FV[niche_worst]
                            worst_idx = niche_worst
                            worst_niche = crowdest_niche
                    survivors[worst_idx] = False
                    count[worst_niche] -= 1
                S, rank = S[survivors], rank[survivors]
            S.set('rank', rank)
            self.opt = S[rank == 0]
            return S

    def _updateDA(self, pop, Hd, n_survive):
        """Update the Diversity archive (DA)"""
        niche_Hd, FV = self._associate(Hd)
        niche_CA, _ = self._associate(pop)

        itr = 1
        S = []
        while len(S) < n_survive:
            for i in range(n_survive):
                current_ca, = np.where(niche_CA == i)
                if len(current_ca) < itr:
                    for _ in range(itr - len(current_ca)):
                        current_da = np.where(niche_Hd == i)[0]
                        if current_da.size > 0:
                            F = Hd[current_da].get('F')
                            nd = NonDominatedSorting().do(F, only_non_dominated_front=True, n_stop_if_ranked=0)
                            i_best = current_da[nd[np.argmin(FV[current_da[nd]])]]
                            niche_Hd[i_best] = -1
                            if len(S) < n_survive:
                                S.append(i_best)
                        else:
                            break
                if len(S) == n_survive:
                    break
            itr += 1
        return Hd[S]


[docs] class CTAEA(GeneticAlgorithm): def __init__(self, ref_dirs, sampling=FloatRandomSampling(), selection=RestrictedMating(func_comp=comp_by_cv_dom_then_random), crossover=SBX(n_offsprings=1, eta=30, prob=1.0), mutation=PM(prob_var=None, eta=20), eliminate_duplicates=True, output=MultiObjectiveOutput(), **kwargs): """ CTAEA Parameters ---------- ref_dirs : {ref_dirs} sampling : {sampling} selection : {selection} crossover : {crossover} mutation : {mutation} eliminate_duplicates : {eliminate_duplicates} """ self.ref_dirs = ref_dirs pop_size = len(ref_dirs) if 'survival' in kwargs: survival = kwargs['survival'] del kwargs['survival'] else: survival = CADASurvival(ref_dirs) # Initialize diversity archives self.da = None super().__init__(pop_size=pop_size, sampling=sampling, selection=selection, crossover=crossover, mutation=mutation, survival=survival, eliminate_duplicates=eliminate_duplicates, n_offsprings=pop_size, output=output, **kwargs) def _setup(self, problem, **kwargs): if self.ref_dirs is not None and self.ref_dirs.shape[1] != problem.n_obj: raise Exception( "Dimensionality of reference points must be equal to the number of objectives: %s != %s" % (self.ref_dirs.shape[1], problem.n_obj)) def _initialize_infill(self): return self.initialization.do(self.problem, self.pop_size, algorithm=self, random_state=self.random_state) def _initialize_advance(self, infills=None, **kwargs): super()._initialize_advance(infills, **kwargs) self.pop, self.da = self.survival.do(self.problem, self.pop, Population(), n_survive=len(self.pop), algorithm=self, random_state=self.random_state) def _infill(self): Hm = Population.merge(self.pop, self.da) return self.mating.do(self.problem, Hm, n_offsprings=self.n_offsprings, algorithm=self, random_state=self.random_state) def _advance(self, infills=None, **kwargs): assert infills is not None, "This algorithms uses the AskAndTell interface thus infills must to be provided." pop = Population.merge(self.pop, infills) self.pop, self.da = self.survival.do(self.problem, pop, self.da, self.pop_size, algorithm=self, random_state=self.random_state) def _set_optimum(self, **kwargs): if not has_feasible(self.pop): self.opt = self.pop[[np.argmin(self.pop.get("CV"))]] else: self.opt = self.survival.opt
parse_doc_string(CTAEA.__init__)