Commit 947d23c2 authored by fonhorst's avatar fonhorst
Browse files

replace make_individual with IndividualBuilder

add FitnessEstimator interface to ga
fix the interface of surrogate based fitness estimator mixin
add computable and surrogate-based computable fitness estimators
add extended set of examples for autotm_fit_predict
fix bayes opt
add integration of local and distributed (old) modes
1 merge request!25Feature/fitness refactoring
Showing with 537 additions and 294 deletions
+537 -294
#!/usr/bin/env python3
import copy
import logging
import logging.config
import os
import random
import sys
......@@ -8,13 +9,15 @@ import uuid
from multiprocessing.pool import AsyncResult
from typing import List, Optional, Union
import click
import yaml
from hyperopt import STATUS_OK, fmin, hp, tpe
from tqdm import tqdm
from yaml import Loader
from autotm.algorithms_for_tuning.individuals import IndividualDTO
from autotm.algorithms_for_tuning.individuals import IndividualDTO, IndividualBuilder
from autotm.fitness.estimator import FitnessEstimator, ComputableFitnessEstimator
from autotm.fitness.tm import fit_tm, TopicModel
from autotm.params import FixedListParams
from autotm.utils import TqdmToLogger, make_log_config_dict
ALG_ID = "bo"
......@@ -80,10 +83,19 @@ NUM_FITNESS_EVALUATIONS = config["boAlgoParams"]["numEvals"]
class BigartmFitness:
def __init__(self, dataset: str, exp_id: Optional[int] = None):
def __init__(self,
data_path: str,
topic_count: int,
ibuilder: IndividualBuilder,
fitness_estimator: FitnessEstimator,
dataset: str,
exp_id: Optional[int] = None):
self.data_path = data_path
self.topic_count = topic_count
self.ibuilder = ibuilder
self.fitness_estimator = fitness_estimator
self.dataset = dataset
self.exp_id = exp_id
# self.best_solution: Optional[IndividualDTO] = None
def parse_kwargs(self, **kwargs):
params = []
......@@ -102,54 +114,56 @@ class BigartmFitness:
params.append(kwargs.get("decor_2", 1))
return params
def make_individ(self, **kwargs):
def make_ind_dto(self, **kwargs):
# TODO: adapt this function to work with baesyian optimization
params = [float(i) for i in self.parse_kwargs(**kwargs)]
params = params[:-1] + [0.0, 0.0, 0.0] + [params[-1]]
return IndividualDTO(
id=str(uuid.uuid4()),
data_path=self.data_path,
dataset=self.dataset,
params=params,
topic_count=self.topic_count,
params=FixedListParams(params=params),
exp_id=self.exp_id,
alg_id=ALG_ID,
)
def __call__(self, kwargs):
population = [self.make_individ(**kwargs)]
population = estimate_fitness(population)
population = [self.ibuilder.make_individual(self.make_ind_dto(**kwargs))]
population = self.fitness_estimator.estimate(-1, population)
individ = population[0]
# if self.best_solution is None or individ.fitness_value > self.best_solution.fitness_value:
# self.best_solution = copy.deepcopy(individ)
return {"loss": -1 * individ.fitness_value, "status": STATUS_OK}
@click.command(context_settings=dict(allow_extra_args=True))
@click.option("--dataset", required=True, type=str, help="dataset name in the config")
@click.option(
"--log-file",
type=str,
default="/var/log/tm-alg-bo.log",
help="a log file to write logs of the algorithm execution to",
)
@click.option("--exp-id", required=True, type=int, help="mlflow experiment id")
def run_algorithm(dataset, log_file, exp_id):
def run_algorithm(dataset,
data_path,
topic_count,
log_file,
exp_id,
num_evaluations,
individual_type: str = "regular",
train_option: str = "offline") -> TopicModel:
run_uid = uuid.uuid4() if not config["testMode"] else None
logging_config = make_log_config_dict(filename=log_file, uid=run_uid)
logging.config.dictConfig(logging_config)
fitness = BigartmFitness(dataset, exp_id)
ibuilder = IndividualBuilder(individual_type)
fitness_estimator = ComputableFitnessEstimator(ibuilder, num_evaluations)
fitness = BigartmFitness(data_path, topic_count, ibuilder, fitness_estimator, dataset, exp_id)
best_params = fmin(
fitness, SPACE, algo=tpe.suggest, max_evals=NUM_FITNESS_EVALUATIONS
fitness, SPACE, algo=tpe.suggest, max_evals=num_evaluations
)
best_solution = fitness.make_individ(**best_params)
best_solution = log_best_solution(
best_solution, wait_for_result_timeout=-1, alg_args=" ".join(sys.argv)
best_solution_dto = fitness.make_ind_dto(**best_params)
best_solution_dto = log_best_solution(
best_solution_dto, wait_for_result_timeout=-1, alg_args=" ".join(sys.argv)
)
print(best_solution.fitness_value * -1)
best_topic_model = fit_tm(
preproc_data_path=data_path,
topic_count=topic_count,
params=best_solution_dto.params,
train_option=train_option
)
if __name__ == "__main__":
run_algorithm()
return best_topic_model
import copy
import gc
import logging
import math
import operator
import random
import sys
import time
import uuid
from typing import Optional, Tuple, Callable
from typing import Optional, Callable, List
import numpy as np
from autotm.abstract_params import AbstractParams
from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector
from autotm.algorithms_for_tuning.genetic_algorithm.selection import selection
from autotm.algorithms_for_tuning.genetic_algorithm.surrogate import set_surrogate_fitness, Surrogate, \
get_prediction_uncertanty
from autotm.algorithms_for_tuning.individuals import make_individual, IndividualDTO, Individual
from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector
from autotm.algorithms_for_tuning.individuals import IndividualDTO, Individual, IndividualBuilder
from autotm.algorithms_for_tuning.nelder_mead_optimization.nelder_mead import (
NelderMeadOptimization,
)
from autotm.fitness.tasks import estimate_fitness, log_best_solution
from autotm.params import create_individual, FixedListParams
from autotm.fitness.estimator import FitnessEstimator
from autotm.params import create_individual
from autotm.utils import AVG_COHERENCE_SCORE
from autotm.visualization.dynamic_tracker import MetricsCollector
ALG_ID = "ga"
SPEEDUP = True
logger = logging.getLogger("GA_algo")
......@@ -60,17 +55,16 @@ class GA:
data_path,
num_individuals,
num_iterations,
ibuilder: IndividualBuilder,
fitness_estimator: FitnessEstimator,
mutation_type="mutation_one_param",
crossover_type="blend_crossover",
selection_type="fitness_prop",
elem_cross_prob=0.2,
num_fitness_evaluations: Optional[int] = 500,
early_stopping_iterations: Optional[int] = 500,
best_proc=0.3,
alpha=None,
exp_id: Optional[int] = None,
surrogate_name=None,
calc_scheme="type2",
topic_count: Optional[int] = None,
fitness_obj_type="single_objective",
tag: Optional[str] = None,
......@@ -116,24 +110,17 @@ class GA:
self.data_path = data_path
self.num_individuals = num_individuals
self.num_iterations = num_iterations
self.ibuilder = ibuilder
self.fitness_estimator = fitness_estimator
self.mutation_type = mutation_type
self.crossover_type = crossover_type
self.selection = selection(selection_type)
self.elem_cross_prob = elem_cross_prob
self.alpha = alpha
self.evaluations_counter = 0
self.num_fitness_evaluations = num_fitness_evaluations
self.early_stopping_iterations = early_stopping_iterations
self.fitness_obj_type = fitness_obj_type
self.best_proc = best_proc
self.all_params = []
self.all_fitness = []
if surrogate_name:
self.surrogate = Surrogate(surrogate_name, **kwargs)
else:
self.surrogate = None
self.exp_id = exp_id
self.calc_scheme = calc_scheme
self.topic_count = topic_count
self.tag = tag
self.use_pipeline = use_pipeline
......@@ -151,20 +138,6 @@ class GA:
{}
) # generation, parent_1_params, parent_2_params, ...
def estimate_fitness(self, population):
evaluated = [individual for individual in population if individual.dto.fitness_value is not None]
not_evaluated = [individual for individual in population if individual.dto.fitness_value is None]
evaluations_limit = max(0, self.num_fitness_evaluations - self.evaluations_counter) \
if self.num_fitness_evaluations else len(not_evaluated)
if len(not_evaluated) > evaluations_limit:
not_evaluated = not_evaluated[:evaluations_limit]
self.evaluations_counter += len(not_evaluated)
new_evaluated = estimate_fitness(not_evaluated)
if self.statistics_collector:
for individual in new_evaluated:
self.statistics_collector.log_individual(individual)
return evaluated + new_evaluated
def init_population(self):
list_of_individuals = []
for i in range(self.num_individuals):
......@@ -181,122 +154,24 @@ class GA:
train_option=self.train_option,
)
# TODO: improve heuristic on search space
list_of_individuals.append(make_individual(dto=dto))
population_with_fitness = self.estimate_fitness(list_of_individuals)
list_of_individuals.append(self.ibuilder.make_individual(dto=dto))
population_with_fitness = self.run_fitness(list_of_individuals, -1)
# self.save_params(population_with_fitness)
# if self.surrogate is not None and self.calc_scheme == "type2":
# self.surrogate.fit(np.array(self.all_params), np.array(self.all_fitness))
# logger.info("Surrogate is initialized!")
self.fitness_estimator.fit(iter_num=-1)
self.save_params(population_with_fitness)
if self.surrogate is not None and self.calc_scheme == "type2":
self.surrogate.fit(np.array(self.all_params), np.array(self.all_fitness))
logger.info("Surrogate is initialized!")
return population_with_fitness
@staticmethod
def _sort_population(population):
population.sort(key=operator.attrgetter("fitness_value"), reverse=True)
def _calculate_uncertain_res(self, generation, iteration_num: int, proc=0.3):
if len(generation) == 0:
return []
X = np.array([individ.dto.params.to_vector() for individ in generation])
certanty = get_prediction_uncertanty(
self.surrogate.surrogate, X, self.surrogate.name
)
recalculate_num = int(np.floor(len(certanty) * proc))
logger.info(f"Certanty values: {certanty}")
certanty, X = (
list(t) for t in zip(*sorted(zip(certanty, X.tolist()), reverse=True))
) # check
calculated = []
for individual in generation[:recalculate_num]:
# copy
individual_json = individual.dto.model_dump_json()
individual = make_individual(dto=IndividualDTO.model_validate_json(individual_json))
individual.dto.fitness_value = None
calculated.append(individual)
calculated = self.estimate_fitness(calculated)
self.all_params += [individ.dto.params.to_vector() for individ in calculated]
self.all_fitness += [
individ.dto.fitness_value["avg_coherence_score"] for individ in calculated
]
pred_y = self.surrogate.predict(X[recalculate_num:])
for ix, individual in enumerate(generation[recalculate_num:]):
dto = IndividualDTO(
id=str(uuid.uuid4()),
data_path=self.data_path,
params=individual.dto.params,
dataset=self.dataset,
fitness_value=set_surrogate_fitness(pred_y[ix]),
exp_id=self.exp_id,
alg_id=ALG_ID,
topic_count=self.topic_count,
tag=self.tag,
train_option=self.train_option,
)
calculated.append(make_individual(dto=dto))
return calculated
def save_params(self, population):
params_and_f = [
(copy.deepcopy(individ.params.to_vector()), individ.fitness_value)
for individ in population
if individ.fitness_value not in self.all_fitness
]
def check_val(fval):
return not (fval is None or math.isnan(fval) or math.isinf(fval))
def check_params(p):
return all(check_val(el) for el in p)
clean_params_and_f = []
for p, f in params_and_f:
if not check_params(p) or not check_val(f):
logger.warning(f"Bad params or fitness found. Fitness: {f}. Params: {p}.")
else:
clean_params_and_f.append((p, f))
pops = [p for p, _ in clean_params_and_f]
fs = [f for _, f in clean_params_and_f]
self.all_params += pops
self.all_fitness += fs
def surrogate_calculation(self, population):
X_val = np.array([copy.deepcopy(individ.params.to_vector()) for individ in population])
y_pred = self.surrogate.predict(X_val)
if not SPEEDUP:
y_val = np.array([individ.fitness_value for individ in population])
def check_val(fval):
return not (fval is None or math.isnan(fval) or math.isinf(fval))
def check_params(p):
return all(check_val(el) for el in p)
clean_params_and_f = []
for i in range(len(y_val)):
if not check_params(X_val[i]) or not check_val(y_val[i]):
logger.warning(
f"Bad params or fitness found. Fitness: {y_val[i]}. Params: {X_val[i]}."
)
else:
clean_params_and_f.append((X_val[i], y_val[i]))
X_val = clean_params_and_f[0]
y_val = clean_params_and_f[1]
r_2, mse, rmse = self.surrogate.score(X_val, y_val)
logger.info(f"Real values: {list(y_val)}")
logger.info(f"Predicted values: {list(y_pred)}")
logger.info(f"R^2: {r_2}, MSE: {mse}, RMSE: {rmse}")
for ix, individ in enumerate(population):
individ.dto.fitness_value = set_surrogate_fitness(y_pred[ix])
return population
def run_crossover(self, pairs_generator, surrogate_iteration, iteration_num: int):
def run_crossover(self, pairs_generator, iteration_num: int):
new_generation = []
crossover_changes = {
......@@ -331,7 +206,7 @@ class GA:
train_option=self.train_option,
) for child in children]
individuals = [make_individual(child) for child in children_dto]
individuals = [self.ibuilder.make_individual(child) for child in children_dto]
new_generation += individuals
crossover_changes["parent_1_params"].append(i.params)
......@@ -341,7 +216,7 @@ class GA:
crossover_changes["child_id"].append(len(new_generation) - 1)
if len(new_generation) > 0:
new_generation = self.run_fitness(new_generation, surrogate_iteration, iteration_num)
new_generation = self.run_fitness(new_generation, iteration_num)
logger.info(f"size of the new generation is {len(new_generation)}")
for i in range(len(crossover_changes["parent_1_params"])):
......@@ -391,11 +266,13 @@ class GA:
train_option=self.train_option,
)
new_population.append(make_individual(dto=solution_dto))
new_population.append(self.ibuilder.make_individual(dto=solution_dto))
return new_population
def run(self, verbose=False, visualize_results=False) -> Individual:
self.evaluations_counter = 0
assert self.fitness_estimator.evaluations_counter == 0, \
"Fitness estimator has non-zero evaluations count and cannot be reused"
ftime = str(int(time.time()))
# os.makedirs(LOG_FILE_PATH, exist_ok=True)
......@@ -405,7 +282,7 @@ class GA:
logger.info(
f"ALGORITHM PARAMS number of individuals {self.num_individuals}; "
f"number of fitness evals "
f"{self.num_fitness_evaluations if self.num_fitness_evaluations else 'unlimited'}; "
f"{self.fitness_estimator.num_fitness_evaluations if self.fitness_estimator.num_fitness_evaluations else 'unlimited'}; "
f"number of early stopping iterations "
f"{self.early_stopping_iterations if self.early_stopping_iterations else 'unlimited'}; "
f"crossover prob {self.elem_cross_prob}"
......@@ -416,7 +293,6 @@ class GA:
x, y = [], []
high_fitness = 0
surrogate_iteration = False
best_val_so_far = -10
early_stopping_counter = 0
......@@ -426,12 +302,12 @@ class GA:
logger.info(f"ENTERING GENERATION {ii}")
if self.surrogate is not None:
surrogate_iteration = ii % 2 != 0
self._sort_population(population)
if self.statistics_collector is not None:
self.statistics_collector.log_iteration(self.evaluations_counter, population[0].fitness_value)
self.statistics_collector.log_iteration(
self.fitness_estimator.evaluations_counter,
population[0].fitness_value
)
pairs_generator = self.selection(
population=population,
best_proc=self.best_proc,
......@@ -442,7 +318,7 @@ class GA:
# Crossover
new_generation = self.run_crossover(
pairs_generator, surrogate_iteration, iteration_num=ii
pairs_generator, iteration_num=ii
)
self._sort_population(new_generation)
......@@ -484,7 +360,7 @@ class GA:
for p in population:
p.dto.iteration_id = ii
population = self.run_fitness(population, surrogate_iteration, ii)
population = self.run_fitness(population, ii)
# TODO (pipeline) Mutations collection is disabled
# before_mutation = [] # individual
......@@ -522,7 +398,7 @@ class GA:
)
population[i] = elem
if self.num_fitness_evaluations and self.evaluations_counter >= self.num_fitness_evaluations:
if self.fitness_estimator.num_fitness_evaluations and self.fitness_estimator.evaluations_counter >= self.fitness_estimator.num_fitness_evaluations:
self.metric_collector.save_fitness(
generation=ii,
params=[i.params for i in population],
......@@ -544,15 +420,17 @@ class GA:
if (current_fitness > high_fitness) or (ii == 0):
high_fitness = current_fitness
if self.surrogate:
if self.calc_scheme == "type1" and not surrogate_iteration:
self.surrogate.fit(
np.array(self.all_params), np.array(self.all_fitness)
)
elif self.calc_scheme == "type2":
self.surrogate.fit(
np.array(self.all_params), np.array(self.all_fitness)
)
# if self.surrogate:
# if self.calc_scheme == "type1" and not surrogate_iteration:
# self.surrogate.fit(
# np.array(self.all_params), np.array(self.all_fitness)
# )
# elif self.calc_scheme == "type2":
# self.surrogate.fit(
# np.array(self.all_params), np.array(self.all_fitness)
# )
self.fitness_estimator.fit(ii)
if self.early_stopping_iterations:
if population[0].fitness_value > best_val_so_far:
......@@ -594,7 +472,7 @@ class GA:
f"RUN ID {run_id}."
)
best_solution = population[0]
log_best_solution(best_solution, alg_args=" ".join(sys.argv), is_tmp=True)
self.fitness_estimator.log_best_solution(best_solution, alg_args=" ".join(sys.argv), is_tmp=True)
if visualize_results:
self.metric_collector.save_and_visualise_trace()
......@@ -602,29 +480,19 @@ class GA:
self.metric_collector.save_trace()
if self.statistics_collector is not None:
self.statistics_collector.log_iteration(self.evaluations_counter, population[0].fitness_value)
self.statistics_collector.log_iteration(
self.fitness_estimator.evaluations_counter,
population[0].fitness_value
)
logger.info(f"Y: {y}")
best_individual = population[0]
ind = log_best_solution(best_individual, alg_args=" ".join(sys.argv))
ind = self.fitness_estimator.log_best_solution(best_individual, alg_args=" ".join(sys.argv))
logger.info(f"Logged the best solution. Obtained fitness is {ind.fitness_value}")
return ind
def run_fitness(self, population, surrogate_iteration, ii):
fitness_calc_time_start = time.time()
if not SPEEDUP or not self.surrogate or not surrogate_iteration:
population = self.estimate_fitness(population)
self.save_params(population)
if self.surrogate:
if self.calc_scheme == "type1" and surrogate_iteration:
population = self.surrogate_calculation(population)
elif self.calc_scheme == "type2":
population = self._calculate_uncertain_res(population, iteration_num=ii)
self.save_params(population)
else:
raise ValueError(f"Unexpected surrogate scheme! {self.calc_scheme}")
logger.info(f"TIME OF THE FITNESS FUNCTION: {time.time() - fitness_calc_time_start}")
return population
def run_fitness(self, population: List[Individual], ii: int):
return self.fitness_estimator.estimate(ii,population )
def run_mutation(self, population):
for i in range(1, len(population)):
......@@ -647,7 +515,7 @@ class GA:
tag=self.tag,
train_option=self.train_option,
)
population[i] = make_individual(dto=dto)
population[i] = self.ibuilder.make_individual(dto=dto)
# multistage bag of regularizers approach
......
#!/usr/bin/env python3
import logging
import logging.config
import sys
import uuid
from typing import Union, Optional
from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector
from autotm.algorithms_for_tuning.genetic_algorithm.ga import GA
from autotm.algorithms_for_tuning.genetic_algorithm.surrogate import Surrogate
from autotm.algorithms_for_tuning.individuals import IndividualBuilder
from autotm.fitness.estimator import ComputableFitnessEstimator, SurrogateEnabledComputableFitnessEstimator, \
DistributedSurrogateEnabledComputableFitnessEstimator
from autotm.fitness.tm import fit_tm, TopicModel
from autotm.utils import make_log_config_dict
logger = logging.getLogger(__name__)
NUM_FITNESS_EVALUATIONS = 150
SPEEDUP = True
def get_best_individual(
......@@ -41,6 +47,8 @@ def get_best_individual(
train_option: str = "offline",
quiet_log: bool = False,
statistics_collector: Optional[StatisticsCollector] = None,
individual_type: str = "regular",
fitness_estimator_type: str = "local", # distributed
**kwargs
):
"""
......@@ -96,11 +104,40 @@ def get_best_individual(
if cross_alpha is not None:
cross_alpha = float(cross_alpha)
ibuilder = IndividualBuilder(individual_type)
if fitness_estimator_type == "local" and surrogate_name:
fitness_estimator = SurrogateEnabledComputableFitnessEstimator(
ibuilder,
Surrogate(surrogate_name),
"type1",
SPEEDUP,
num_fitness_evaluations,
statistics_collector
)
elif fitness_estimator_type == "local":
fitness_estimator = ComputableFitnessEstimator(ibuilder, num_fitness_evaluations, statistics_collector)
elif fitness_estimator_type == "distributed" and surrogate_name:
fitness_estimator = DistributedSurrogateEnabledComputableFitnessEstimator(
ibuilder,
Surrogate(surrogate_name),
"type1",
SPEEDUP,
num_fitness_evaluations,
statistics_collector
)
elif fitness_estimator_type == "distributed":
fitness_estimator = ComputableFitnessEstimator(ibuilder, num_fitness_evaluations, statistics_collector)
else:
raise ValueError("Incorrect settings")
g = GA(
dataset=dataset,
data_path=data_path,
num_individuals=num_individuals,
num_iterations=num_iterations,
ibuilder=ibuilder,
fitness_estimator=fitness_estimator,
mutation_type=mutation_type,
crossover_type=crossover_type,
selection_type=selection_type,
......@@ -149,19 +186,23 @@ def run_algorithm(
gpr_kernel: str = None,
gpr_alpha: float = None,
gpr_normalize_y: float = None,
use_pipeline: bool = False,
use_pipeline: bool = True,
use_nelder_mead_in_mutation: bool = False,
use_nelder_mead_in_crossover: bool = False,
use_nelder_mead_in_selector: bool = False,
train_option: str = "offline",
quiet_log: bool = False,
individual_type: str = "regular",
fitness_estimator_type: str = "local"
) -> TopicModel:
best_individual = get_best_individual(dataset, data_path, exp_id, topic_count, num_individuals, num_iterations,
num_fitness_evaluations, mutation_type, crossover_type, selection_type,
elem_cross_prob, cross_alpha, best_proc, log_file, tag, surrogate_name,
gpr_kernel, gpr_alpha, gpr_normalize_y, use_pipeline,
use_nelder_mead_in_mutation, use_nelder_mead_in_crossover,
use_nelder_mead_in_selector, train_option, quiet_log)
use_nelder_mead_in_selector, train_option, quiet_log,
individual_type=individual_type,
fitness_estimator_type=fitness_estimator_type)
best_topic_model = fit_tm(
preproc_data_path=data_path,
......
......@@ -142,7 +142,17 @@ class SparsityScalerBasedFitnessIndividual(BaseIndividual):
return alpha * self.dto.fitness_value[AVG_COHERENCE_SCORE]
def make_individual(dto: IndividualDTO) -> Individual:
# TODO: choose fitness by ENV var
return RegularFitnessIndividual(dto=dto)
# return SparsityScalerBasedFitnessIndividual(dto=dto)
class IndividualBuilder:
SUPPORTED_IND_TYPES = ["regular", "sparse"]
def __init__(self, ind_type: str = "regular"):
self.ind_type = ind_type
if self.ind_type not in self.SUPPORTED_IND_TYPES:
raise ValueError(f"Unsupported ind type: {self.ind_type}")
def make_individual(self, dto: IndividualDTO) -> Individual:
if self.ind_type == "regular":
return RegularFitnessIndividual(dto=dto)
else:
return SparsityScalerBasedFitnessIndividual(dto=dto)
......@@ -167,7 +167,9 @@ class AutoTM(BaseEstimator):
else:
# TODO: refactor this function
best_topic_model = bayes_opt.run_algorithm(
dataset=processed_dataset_path,
dataset=self.exp_dataset_name or "__noname__",
data_path=processed_dataset_path,
topic_count=self.topic_count,
log_file=self.log_file_path,
exp_id=self.exp_id or "0",
**self.alg_params
......
import logging
import os
import time
import uuid
from multiprocessing.process import current_process
from typing import List, Optional, Union, cast
from typing import List, Optional, cast
import celery
from billiard.exceptions import SoftTimeLimitExceeded
......@@ -13,11 +12,11 @@ from celery.result import GroupResult, AsyncResult
from celery.utils.log import get_task_logger
from tqdm import tqdm
from autotm.algorithms_for_tuning.individuals import Individual, make_individual
from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder
from autotm.fitness.tm import fit_tm_of_individual
from autotm.params_logging_utils import model_files, log_params_and_artifacts, log_stats
from autotm.schemas import IndividualDTO
from autotm.utils import TqdmToLogger, AVG_COHERENCE_SCORE
from autotm.utils import TqdmToLogger
logger = logging.getLogger("root")
task_logger = get_task_logger(__name__)
......@@ -109,7 +108,8 @@ def calculate_fitness(self: Task,
self.retry(max_retries=1, countdown=5)
def parallel_fitness(population: List[Individual],
def parallel_fitness(ibuilder: IndividualBuilder,
population: List[Individual],
use_tqdm: bool = False,
tqdm_check_period: int = 2,
app: Optional[celery.Celery] = None) -> List[Individual]:
......@@ -170,17 +170,18 @@ def parallel_fitness(population: List[Individual],
# restoring the order in the resulting population according to the initial population
# results_by_id = {ind.id: ind for ind in (fitness_from_json(r) for r in results)}
results_by_id = {ind.id: ind for ind in (IndividualDTO.parse_raw(r) for r in results)}
return [make_individual(results_by_id[ind.dto.id]) for ind in population]
return [ibuilder.make_individual(results_by_id[ind.dto.id]) for ind in population]
def log_best_solution(individual: Individual,
def log_best_solution(ibuilder: IndividualBuilder,
individual: Individual,
wait_for_result_timeout: Optional[float] = None,
alg_args: Optional[str] = None,
is_tmp: bool = False,
app: Optional[celery.Celery] = None) \
-> Individual:
if is_tmp:
return make_individual(individual.dto)
return ibuilder.make_individual(individual.dto)
# ind = fitness_to_json(individual)
ind = individual.dto.json()
logger.info(f"Sending a best individual to be logged: {ind}")
......@@ -207,28 +208,6 @@ def log_best_solution(individual: Individual,
r = result.get(timeout=wait_for_result_timeout)
r = IndividualDTO.parse_raw(r)
ind = make_individual(r)
ind = ibuilder.make_individual(r)
return ind
class FitnessCalculatorWrapper:
def __init__(self, dataset, data_path, topic_count, train_option):
self.dataset = dataset
self.data_path = data_path
self.topic_count = topic_count
self.train_option = train_option
def run(self, params):
params = list(params)
params = params[:-1] + [0, 0, 0] + [params[-1]]
solution_dto = IndividualDTO(id=str(uuid.uuid4()),
dataset=self.dataset,
params=params,
alg_id="ga",
topic_count=self.topic_count, train_option=self.train_option)
dto = parallel_fitness([solution_dto])[0]
result = dto.fitness_value[AVG_COHERENCE_SCORE]
return -result
import copy
import logging
import math
import time
import uuid
from abc import ABC, abstractmethod
from typing import List, Optional
import numpy as np
from autotm.abstract_params import AbstractParams
from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector
from autotm.algorithms_for_tuning.genetic_algorithm.surrogate import Surrogate, set_surrogate_fitness, \
get_prediction_uncertanty
from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder
from autotm.fitness import local_tasks, cluster_tasks
from autotm.schemas import IndividualDTO
logger = logging.getLogger(__name__)
class FitnessEstimator:
def __init__(self, num_fitness_evaluations: Optional[int] = None, statistics_collector: Optional[StatisticsCollector] = None):
self._num_fitness_evaluations = num_fitness_evaluations
self._evaluations_counter = 0
self._statistics_collector = statistics_collector
super().__init__()
@property
def num_fitness_evaluations(self) -> Optional[int]:
return self._num_fitness_evaluations
@property
def evaluations_counter(self) -> int:
return self._evaluations_counter
@abstractmethod
def fit(self, iter_num: int) -> None:
...
@abstractmethod
def log_best_solution(self,
individual: Individual,
wait_for_result_timeout: Optional[float] = None,
alg_args: Optional[str] = None,
is_tmp: bool = False) -> Individual:
...
def estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]:
evaluated = [individual for individual in population if individual.dto.fitness_value is not None]
not_evaluated = [individual for individual in population if individual.dto.fitness_value is None]
evaluations_limit = max(0, self._num_fitness_evaluations - self._evaluations_counter) \
if self._num_fitness_evaluations else len(not_evaluated)
if len(not_evaluated) > evaluations_limit:
not_evaluated = not_evaluated[:evaluations_limit]
self._evaluations_counter += len(not_evaluated)
new_evaluated = self._estimate(iter_num, not_evaluated)
if self._statistics_collector:
for individual in new_evaluated:
self._statistics_collector.log_individual(individual)
return evaluated + new_evaluated
@abstractmethod
def _estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]:
...
class SurrogateEnabledFitnessEstimatorMixin(FitnessEstimator, ABC):
SUPPORTED_CALC_SCHEMES = ["type1", "type2"]
ibuilder: IndividualBuilder
surrogate: Surrogate
calc_scheme: str
speedup: bool
all_params: List[AbstractParams]
all_fitness: List[float]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@staticmethod
def surrogate_iteration(iter_num: int) -> bool:
return (iter_num % 2 != 0) if iter_num > 0 else False
def fit(self, iter_num: int) -> None:
surrogate_iteration = self.surrogate_iteration(iter_num)
if (self.calc_scheme == "type1" and not surrogate_iteration) or (self.calc_scheme == "type2"):
self.surrogate.fit(np.array(self.all_params), np.array(self.all_fitness))
def _estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]:
fitness_calc_time_start = time.time()
surrogate_iteration = self.surrogate_iteration(iter_num)
if not self.speedup or not surrogate_iteration or iter_num == -1:
population = super().estimate(iter_num, population)
self.save_params(population)
if self.calc_scheme == "type1" and surrogate_iteration:
population = self.surrogate_calculation(population)
elif self.calc_scheme == "type2" and iter_num != -1:
population = self._calculate_uncertain_res(iter_num, population)
self.save_params(population)
logger.info(f"TIME OF THE SURROGATE-BASED FITNESS FUNCTION: {time.time() - fitness_calc_time_start}")
return population
def surrogate_calculation(self, population: List[Individual]):
x_val = np.array([copy.deepcopy(individ.params.to_vector()) for individ in population])
y_pred = self.surrogate.predict(x_val)
if not self.speedup:
y_val = np.array([individ.fitness_value for individ in population])
def check_val(fval):
return not (fval is None or math.isnan(fval) or math.isinf(fval))
def check_params(p):
return all(check_val(el) for el in p)
clean_params_and_f = []
for i in range(len(y_val)):
if not check_params(x_val[i]) or not check_val(y_val[i]):
logger.warning(
f"Bad params or fitness found. Fitness: {y_val[i]}. Params: {x_val[i]}."
)
else:
clean_params_and_f.append((x_val[i], y_val[i]))
x_val = clean_params_and_f[0]
y_val = clean_params_and_f[1]
r_2, mse, rmse = self.surrogate.score(x_val, y_val)
logger.info(f"Real values: {list(y_val)}")
logger.info(f"Predicted values: {list(y_pred)}")
logger.info(f"R^2: {r_2}, MSE: {mse}, RMSE: {rmse}")
for ix, individ in enumerate(population):
individ.dto.fitness_value = set_surrogate_fitness(y_pred[ix])
return population
def _calculate_uncertain_res(self, iter_num: int, population: List[Individual], proc:float = 0.3):
if len(population) == 0:
return []
x = np.array([individ.dto.params.to_vector() for individ in population])
certanty = get_prediction_uncertanty(
self.surrogate.surrogate, x, self.surrogate.name
)
recalculate_num = int(np.floor(len(certanty) * proc))
logger.info(f"Certanty values: {certanty}")
certanty, x = (
list(t) for t in zip(*sorted(zip(certanty, x.tolist()), reverse=True))
) # check
calculated = []
for individual in population[:recalculate_num]:
# copy
individual_json = individual.dto.model_dump_json()
individual = self.ibuilder.make_individual(dto=IndividualDTO.model_validate_json(individual_json))
individual.dto.fitness_value = None
calculated.append(individual)
calculated = super().estimate(iter_num, calculated)
self.all_params += [individ.dto.params.to_vector() for individ in calculated]
self.all_fitness += [
individ.dto.fitness_value["avg_coherence_score"] for individ in calculated
]
pred_y = self.surrogate.predict(x[recalculate_num:])
for ix, individual in enumerate(population[recalculate_num:]):
dto = individual.dto
dto = IndividualDTO(
id=str(uuid.uuid4()),
data_path=dto.data_path,
params=dto.params,
dataset=dto.dataset,
fitness_value=set_surrogate_fitness(pred_y[ix]),
exp_id=dto.exp_id,
alg_id=dto.alg_id,
topic_count=dto.topic_count,
tag=dto.tag,
train_option=dto.train_option,
)
calculated.append(self.ibuilder.make_individual(dto=dto))
return calculated
def save_params(self, population):
params_and_f = [
(copy.deepcopy(individ.params.to_vector()), individ.fitness_value)
for individ in population
if individ.fitness_value not in self.all_fitness
]
def check_val(fval):
return not (fval is None or math.isnan(fval) or math.isinf(fval))
def check_params(pp):
return all(check_val(el) for el in pp)
clean_params_and_f = []
for p, f in params_and_f:
if not check_params(p) or not check_val(f):
logger.warning(f"Bad params or fitness found. Fitness: {f}. Params: {p}.")
else:
clean_params_and_f.append((p, f))
pops = [p for p, _ in clean_params_and_f]
fs = [f for _, f in clean_params_and_f]
self.all_params += pops
self.all_fitness += fs
class ComputableFitnessEstimator(FitnessEstimator):
def __init__(self,
ibuilder: IndividualBuilder,
num_fitness_evaluations: Optional[int] = None,
statistics_collector: Optional[StatisticsCollector] = None):
self.ibuilder = ibuilder
super().__init__(num_fitness_evaluations, statistics_collector)
def fit(self, iter_num: int) -> None:
pass
def log_best_solution(self,
individual: Individual,
wait_for_result_timeout: Optional[float] = None,
alg_args: Optional[str] = None,
is_tmp: bool = False) -> Individual:
return local_tasks.log_best_solution(self.ibuilder, individual, wait_for_result_timeout, alg_args, is_tmp)
def _estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]:
return local_tasks.estimate_fitness(self.ibuilder, population)
class DistributedComputableFitnessEstimator(FitnessEstimator):
def __init__(self,
ibuilder: IndividualBuilder,
num_fitness_evaluations: Optional[int] = None,
statistics_collector: Optional[StatisticsCollector] = None):
self.app = cluster_tasks.make_celery_app()
self.ibuilder = ibuilder
super().__init__(num_fitness_evaluations, statistics_collector)
def fit(self, iter_num: int) -> None:
pass
def log_best_solution(self,
individual: Individual,
wait_for_result_timeout: Optional[float] = None,
alg_args: Optional[str] = None,
is_tmp: bool = False) -> Individual:
return cluster_tasks.log_best_solution(self.ibuilder, individual,
wait_for_result_timeout, alg_args, is_tmp, app=self.app)
def _estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]:
return cluster_tasks.parallel_fitness(self.ibuilder, population, app=self.app)
class SurrogateEnabledComputableFitnessEstimator(ComputableFitnessEstimator, SurrogateEnabledFitnessEstimatorMixin):
def __init__(self,
ibuilder: IndividualBuilder,
surrogate: Surrogate,
calc_scheme: str,
speedup: bool = True,
num_fitness_evaluations: Optional[int] = None,
statistics_collector: Optional[StatisticsCollector] = None):
self.ibuilder = ibuilder
self.surrogate = surrogate
self.calc_scheme = calc_scheme
self.speedup = speedup
self.all_params: List[AbstractParams] = []
self.all_fitness: List[float] = []
if calc_scheme not in self.SUPPORTED_CALC_SCHEMES:
raise ValueError(f"Unexpected surrogate scheme! {self.calc_scheme}")
super().__init__(ibuilder, num_fitness_evaluations, statistics_collector)
class DistributedSurrogateEnabledComputableFitnessEstimator(
DistributedComputableFitnessEstimator,
SurrogateEnabledFitnessEstimatorMixin
):
def __init__(self,
ibuilder: IndividualBuilder,
surrogate: Surrogate,
calc_scheme: str,
speedup: bool = True,
num_fitness_evaluations: Optional[int] = None,
statistics_collector: Optional[StatisticsCollector] = None):
self.ibuilder = ibuilder
self.surrogate = surrogate
self.calc_scheme = calc_scheme
self.speedup = speedup
self.all_params: List[AbstractParams] = []
self.all_fitness: List[float] = []
if calc_scheme not in self.SUPPORTED_CALC_SCHEMES:
raise ValueError(f"Unexpected surrogate scheme! {self.calc_scheme}")
super().__init__(ibuilder, num_fitness_evaluations, statistics_collector)
import logging
from typing import List, Optional
from autotm.algorithms_for_tuning.individuals import make_individual, Individual
from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder
from autotm.fitness.tm import fit_tm_of_individual
from autotm.params_logging_utils import log_params_and_artifacts, log_stats, model_files
from autotm.schemas import IndividualDTO
......@@ -56,7 +56,7 @@ def calculate_fitness(
)
def estimate_fitness(population: List[Individual]) -> List[Individual]:
def estimate_fitness(ibuilder: IndividualBuilder, population: List[Individual]) -> List[Individual]:
logger.info("Calculating fitness...")
population_with_fitness = []
for individual in population:
......@@ -65,19 +65,20 @@ def estimate_fitness(population: List[Individual]) -> List[Individual]:
population_with_fitness.append(individual)
continue
individ_with_fitness = calculate_fitness(individual.dto)
population_with_fitness.append(make_individual(individ_with_fitness))
population_with_fitness.append(ibuilder.make_individual(individ_with_fitness))
logger.info("The fitness results have been obtained")
return population_with_fitness
def log_best_solution(
individual: IndividualDTO,
ibuilder: IndividualBuilder,
individual: Individual,
wait_for_result_timeout: Optional[float] = None,
alg_args: Optional[str] = None,
is_tmp: bool = False,
):
logger.info("Sending a best individual to be logged")
res = make_individual(calculate_fitness(individual.dto,
res = ibuilder.make_individual(calculate_fitness(individual.dto,
log_artifact_and_parameters=True,
is_tmp=is_tmp))
......
import functools
from . import AUTOTM_EXEC_MODE, SUPPORTED_EXEC_MODES
if AUTOTM_EXEC_MODE == 'local':
from .local_tasks import estimate_fitness, log_best_solution
elif AUTOTM_EXEC_MODE == 'cluster':
from .cluster_tasks import make_celery_app
from .cluster_tasks import parallel_fitness, log_best_solution
app = make_celery_app()
estimate_fitness = functools.partial(parallel_fitness, app=app)
log_best_solution = functools.partial(log_best_solution, app=app)
else:
raise ValueError(f"Unknown exec mode: {AUTOTM_EXEC_MODE}. Only the following are supported: {SUPPORTED_EXEC_MODES}")
import logging
import os
import uuid
from typing import Dict, Any
import pandas as pd
from sklearn.model_selection import train_test_split
......@@ -11,10 +12,35 @@ from autotm.base import AutoTM
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
CONFIGURATIONS = {
"base": {
"alg_name": "ga",
"num_iterations": 2,
"num_individuals": 2,
"use_pipeline": True
},
"static_chromosome": {
"alg_name": "ga",
"num_iterations": 2,
"num_individuals": 2,
"use_pipeline": False
},
"surrogate": {
"alg_name": "ga",
"num_iterations": 2,
"num_individuals": 2,
"use_pipeline": True,
"surrogate_name": "random-forest-regressor"
},
"bayes": {
"alg_name": "bayes",
"num_evaluations": 5,
}
}
def main():
def run(alg_name: str, alg_params: Dict[str, Any]):
path_to_dataset = "data/sample_corpora/sample_dataset_lenta.csv"
alg_name = "ga"
df = pd.read_csv(path_to_dataset)
train_df, test_df = train_test_split(df, test_size=0.1)
......@@ -29,14 +55,7 @@ def main():
"min_tokens_count": 3
},
alg_name=alg_name,
alg_params={
"num_iterations": 2,
"num_individuals": 2,
"use_nelder_mead_in_mutation": False,
"use_nelder_mead_in_crossover": False,
"use_nelder_mead_in_selector": False,
"train_option": "offline"
},
alg_params=alg_params,
working_dir_path=working_dir_path,
exp_dataset_name="lenta_ru"
)
......@@ -54,5 +73,18 @@ def main():
logger.info(f"Calculated train mixtures: {mixtures.shape}\n\n{mixtures.head(10).to_string()}")
def main(conf_name: str = "base"):
if conf_name not in CONFIGURATIONS:
raise ValueError(
f"Unknown configuration {conf_name}. Available configurations: {sorted(CONFIGURATIONS.keys())}"
)
conf = CONFIGURATIONS[conf_name]
alg_name = conf['alg_name']
del conf['alg_name']
run(alg_name=alg_name, alg_params=conf)
if __name__ == "__main__":
main()
......@@ -17,18 +17,24 @@ from autotm.algorithms_for_tuning.individuals import Individual
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
SAVE_PATH = "/Users/Maksim.Zuev/PycharmProjects/AutoTMResources/datasets"
# SAVE_PATH = "/Users/Maksim.Zuev/PycharmProjects/AutoTMResources/datasets"
SAVE_PATH = "/home/nikolay/wspace/AutoTMDatasets"
datasets = [
"hotel-reviews_sample",
"lenta_ru_sample"
"amazon_food_sample",
"20newsgroups_sample",
"banners_sample",
# "lenta_ru_sample"
# "amazon_food_sample",
# "20newsgroups_sample",
# "banners_sample",
]
num_iterations = 500
num_fitness_evaluations = 150
num_individuals = 11
# num_iterations = 500
# num_fitness_evaluations = 150
# num_individuals = 11
num_iterations = 2
num_fitness_evaluations = 10
num_individuals = 4
topic_count = 10
use_nelder_mead_in_mutation = False
use_nelder_mead_in_crossover = False
......@@ -157,7 +163,7 @@ def main():
surrogate = None # "random-forest-regressor"
for dataset_name in datasets:
for use_pipeline in [False, True]:
for _ in range(10):
for _ in range(1):
start_time = time.time()
collector = suppress_stdout(lambda: run_single_experiment(os.path.curdir, dataset_name, use_pipeline,
surrogate))
......
......@@ -42,6 +42,7 @@ def test_fit_predict(pytestconfig):
alg_params={
"num_iterations": 2,
"num_individuals": 4,
"use_pipeline": False,
"use_nelder_mead_in_mutation": False,
"use_nelder_mead_in_crossover": False,
"use_nelder_mead_in_selector": False,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment