Unverified Commit 3deb3e36 authored by Rosneft rosneft's avatar Rosneft rosneft Committed by GitHub
Browse files

Various fixes to work with large datasets in better way (#1019)

* Stopping if even one fold failed
* Presets bug fixed
* More stable wor for short timeouts
* Various minor changes
* Sequatial mode without Joblib parallelization
* Tuner timeouts processing
parent 03ae7326
Showing with 227 additions and 134 deletions
+227 -134
Basics
======
This sections contains the description of main use cases for FEDOT:
classification and regression for the tabular data,
time series forecasting, multi-modal tasks
and reproducibility of the experiments.
.. toctree::
:maxdepth: 1
......
......@@ -24,8 +24,8 @@ Content
introduction/index
basics/index
examples/index
advanced/index
examples/index
benchmarks/index
api/index
contribution
......
......@@ -5,7 +5,7 @@ from typing import Any, Optional, Sequence, Union
from fedot.api.main import Fedot
from fedot.core.composer.gp_composer.specific_operators import boosting_mutation, parameter_change_mutation
from fedot.core.dag.graph import Graph
from fedot.core.optimisers.gp_comp.evaluation import SimpleDispatcher
from fedot.core.optimisers.gp_comp.evaluation import SequentialDispatcher
from fedot.core.optimisers.gp_comp.operators.mutation import MutationTypesEnum, Mutation
from fedot.core.optimisers.objective import Objective, ObjectiveFunction
from fedot.core.optimisers.opt_history_objects.individual import Individual
......@@ -35,7 +35,7 @@ class RandomMutationSearchOptimizer(GraphOptimizer):
def optimise(self, objective: ObjectiveFunction):
timer = OptimisationTimer(timeout=self.requirements.timeout)
dispatcher = SimpleDispatcher(self.graph_generation_params.adapter)
dispatcher = SequentialDispatcher(self.graph_generation_params.adapter)
evaluator = dispatcher.dispatch(objective, timer)
num_iter = 0
......
import datetime
import gc
import os
from itertools import chain
from typing import Callable, List, Optional, Sequence, Tuple, Union
from fedot.api.api_utils.assumptions.assumptions_handler import AssumptionsHandler
......@@ -9,14 +10,12 @@ from fedot.api.api_utils.presets import OperationsPreset
from fedot.api.time import ApiTime
from fedot.core.caching.pipelines_cache import OperationsCache
from fedot.core.caching.preprocessing_cache import PreprocessingCache
from fedot.core.pipelines.pipeline_advisor import PipelineChangeAdvisor
from fedot.core.composer.composer_builder import ComposerBuilder
from fedot.core.composer.gp_composer.gp_composer import GPComposer
from fedot.core.composer.gp_composer.specific_operators import boosting_mutation, parameter_change_mutation
from fedot.core.constants import DEFAULT_TUNING_ITERATIONS_NUMBER
from fedot.core.data.data import InputData
from fedot.core.log import default_log
from fedot.core.pipelines.adapters import PipelineAdapter
from fedot.core.optimisers.gp_comp.evaluation import determine_n_jobs
from fedot.core.optimisers.gp_comp.gp_params import GPGraphOptimizerParameters
from fedot.core.optimisers.gp_comp.operators.inheritance import GeneticSchemeTypesEnum
......@@ -24,13 +23,15 @@ from fedot.core.optimisers.gp_comp.operators.mutation import MutationTypesEnum
from fedot.core.optimisers.gp_comp.pipeline_composer_requirements import PipelineComposerRequirements
from fedot.core.optimisers.opt_history_objects.opt_history import OptHistory
from fedot.core.optimisers.optimizer import GraphGenerationParams
from fedot.core.pipelines.adapters import PipelineAdapter
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.pipeline_advisor import PipelineChangeAdvisor
from fedot.core.pipelines.pipeline_node_factory import PipelineOptNodeFactory
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
from fedot.core.pipelines.tuning.unified import PipelineTuner
from fedot.core.pipelines.verification import rules_by_task
from fedot.core.repository.pipeline_operation_repository import PipelineOperationRepository
from fedot.core.repository.quality_metrics_repository import MetricsRepository, MetricType, MetricsEnum
from fedot.core.repository.quality_metrics_repository import MetricType, MetricsEnum
from fedot.core.repository.tasks import Task, TaskTypesEnum
from fedot.core.utilities.data_structures import ensure_wrapped_in_sequence
from fedot.utilities.define_metric_by_task import MetricByTask
......@@ -97,8 +98,11 @@ class ApiComposer:
task = api_params['task']
# define available operations
available_operations = composer_params.get('available_operations',
OperationsPreset(task, preset).filter_operations_by_preset())
if 'available_operations' not in composer_params or composer_params['available_operations'] is None:
available_operations = OperationsPreset(task, preset).filter_operations_by_preset()
else:
available_operations = composer_params['available_operations']
primary_operations, secondary_operations = \
PipelineOperationRepository.divide_operations(available_operations, task)
......@@ -118,6 +122,7 @@ class ApiComposer:
early_stopping_timeout=composer_params.get('early_stopping_timeout', None),
max_pipeline_fit_time=max_pipeline_fit_time,
n_jobs=api_params['n_jobs'],
parallelization_mode=api_params['parallelization_mode'],
show_progress=api_params['show_progress'],
collect_intermediate_metric=composer_params['collect_intermediate_metric'],
keep_n_best=composer_params['keep_n_best'],
......@@ -166,7 +171,7 @@ class ApiComposer:
def _init_graph_generation_params(task: Task, preset: str, available_operations: List[str],
requirements: PipelineComposerRequirements):
advisor = PipelineChangeAdvisor(task)
graph_model_repo = PipelineOperationRepository()\
graph_model_repo = PipelineOperationRepository() \
.from_available_operations(task=task, preset=preset,
available_operations=available_operations)
node_factory = PipelineOptNodeFactory(requirements=requirements, advisor=advisor,
......@@ -195,27 +200,33 @@ class ApiComposer:
initial_assumption = assumption_handler.propose_assumptions(composer_params['initial_assumption'],
available_operations)
n_jobs = determine_n_jobs(api_params['n_jobs'])
with self.timer.launch_assumption_fit():
fitted_assumption = \
assumption_handler.fit_assumption_and_check_correctness(initial_assumption[0],
pipelines_cache=self.pipelines_cache,
preprocessing_cache=self.preprocessing_cache)
preprocessing_cache=self.preprocessing_cache,
eval_n_jobs=n_jobs)
self.log.message(
f'Initial pipeline was fitted in {round(self.timer.assumption_fit_spend_time.total_seconds(), 1)} sec.')
n_jobs = determine_n_jobs(api_params['n_jobs'])
self.preset_name = assumption_handler.propose_preset(preset, self.timer, n_jobs=n_jobs)
composer_requirements = ApiComposer._init_composer_requirements(api_params, composer_params,
self.timer.timedelta_composing,
self.preset_name)
available_operations = list(chain(composer_requirements.primary,
composer_requirements.secondary))
metric_functions = self.obtain_metric(task, composer_params['metric'])
graph_generation_params = \
self._init_graph_generation_params(task=task,
preset=preset,
available_operations=composer_params.get('available_operations'),
available_operations=available_operations,
requirements=composer_requirements)
self.log.message(f"AutoML configured."
f" Parameters tuning: {with_tuning}."
......@@ -268,6 +279,7 @@ class ApiComposer:
.build()
n_jobs = determine_n_jobs(composer_requirements.n_jobs)
if self.timer.have_time_for_composing(composer_params['pop_size'], n_jobs):
# Launch pipeline structure composition
with self.timer.launch_composing():
......@@ -311,7 +323,6 @@ class ApiComposer:
self.was_tuned = False
self.log.message(f'Hyperparameters tuning started with {round(timeout_for_tuning)} min. timeout')
tuned_pipeline = tuner.tune(pipeline_gp_composed)
self.was_tuned = True
self.log.message('Hyperparameters tuning finished')
else:
self.log.message(f'Time for pipeline composing was {str(self.timer.composing_spend_time)}.\n'
......@@ -328,7 +339,9 @@ def _divide_parameters(common_dict: dict) -> List[dict]:
:param common_dict: dictionary with parameters for all AutoML modules
"""
api_params_dict = dict(train_data=None, task=Task, timeout=5, n_jobs=1, show_progress=True, logger=None)
api_params_dict = dict(train_data=None, task=Task, timeout=5,
n_jobs=1, parallelization_mode='populational',
show_progress=True, logger=None)
composer_params_dict = dict(max_depth=None, max_arity=None, pop_size=None, num_of_generations=None,
keep_n_best=None, available_operations=None, metric=None,
......@@ -337,7 +350,7 @@ def _divide_parameters(common_dict: dict) -> List[dict]:
optimizer_external_params=None, collect_intermediate_metric=False,
max_pipeline_fit_time=None, initial_assumption=None, preset='auto',
use_pipelines_cache=True, use_preprocessing_cache=True, cache_folder=None,
keep_history=True, history_dir=None,)
keep_history=True, history_dir=None)
tuner_params_dict = dict(with_tuning=False)
......
......@@ -10,6 +10,7 @@ from fedot.core.data.data import InputData
from fedot.core.data.data_split import train_test_data_setup
from fedot.core.log import default_log
from fedot.core.pipelines.pipeline import Pipeline
from fedot.utilities.memory import MemoryAnalytics
class AssumptionsHandler:
......@@ -44,20 +45,22 @@ class AssumptionsHandler:
def fit_assumption_and_check_correctness(self,
pipeline: Pipeline,
pipelines_cache: Optional[OperationsCache] = None,
preprocessing_cache: Optional[PreprocessingCache] = None) -> Pipeline:
preprocessing_cache: Optional[PreprocessingCache] = None,
eval_n_jobs: int = -1) -> Pipeline:
"""
Check if initial pipeline can be fitted on a presented data
:param pipeline: pipeline for checking
:param pipelines_cache: Cache manager for fitted models, optional.
:param preprocessing_cache: Cache manager for optional preprocessing encoders and imputers, optional.
:param eval_n_jobs: number of jobs to fit the initial pipeline
"""
try:
data_train, data_test = train_test_data_setup(self.data)
self.log.info('Initial pipeline fitting started')
# load preprocessing
pipeline.try_load_from_cache(pipelines_cache, preprocessing_cache)
pipeline.fit(data_train)
pipeline.fit(data_train, n_jobs=eval_n_jobs)
if pipelines_cache is not None:
pipelines_cache.save_pipeline(pipeline)
......@@ -67,6 +70,10 @@ class AssumptionsHandler:
pipeline.predict(data_test)
self.log.info('Initial pipeline was fitted successfully')
MemoryAnalytics.log(self.log,
additional_info='fitting of the initial pipeline',
logging_level=45) # message logging level
except Exception as ex:
self._raise_evaluating_exception(ex)
return pipeline
......@@ -89,5 +96,5 @@ class AssumptionsHandler:
"""
if not preset or preset == 'auto':
preset = change_preset_based_on_initial_fit(timer, n_jobs)
self.log.info(f"Preset was changed to {preset}")
self.log.message(f"Preset was changed to {preset} due to fit time estimation for initial model.")
return preset
......@@ -86,7 +86,7 @@ class ApiParams:
Log().reset_logging_level(input_params['logging_level'])
self.log = default_log(prefix='FEDOT logger')
simple_keys = ['problem', 'n_jobs', 'timeout']
simple_keys = ['problem', 'n_jobs', 'parallelization_mode', 'timeout']
self.api_params = {k: input_params[k] for k in simple_keys}
default_evo_params = self.get_default_evo_params(self.api_params['problem'])
......
......@@ -64,6 +64,7 @@ class Fedot:
safe_mode: if set ``True`` it will cut large datasets to prevent memory overflow and use label encoder
instead of oneHot encoder if summary cardinality of categorical features is high.
n_jobs: num of ``n_jobs`` for parallelization (``-1`` for use all cpu's)
parallelization_mode: type of evaluation for candidate solution groups (populational or sequential)
max_depth: max depth of the pipeline
max_arity: max arity of the pipeline nodes
pop_size: population size for composer
......@@ -107,7 +108,8 @@ class Fedot:
task_params: TaskParams = None,
seed=None, logging_level: int = logging.ERROR,
safe_mode=False,
n_jobs: int = 1,
n_jobs: int = -1,
parallelization_mode: str = 'populational',
**composer_tuner_params
):
......@@ -119,7 +121,8 @@ class Fedot:
# Define parameters, that were set via init in init
input_params = {'problem': self.metrics.main_problem, 'timeout': timeout,
'composer_tuner_params': composer_tuner_params, 'task_params': task_params,
'seed': seed, 'logging_level': logging_level, 'n_jobs': n_jobs}
'seed': seed, 'logging_level': logging_level,
'n_jobs': n_jobs, 'parallelization_mode': parallelization_mode}
self.params.initialize_params(input_params)
# Initialize ApiComposer's cache parameters via ApiParams
......@@ -170,8 +173,12 @@ class Fedot:
recommendations = self.data_analyser.give_recommendation(self.train_data)
self.data_processor.accept_and_apply_recommendations(self.train_data, recommendations)
self.params.accept_and_apply_recommendations(self.train_data, recommendations)
self._init_remote_if_necessary()
self.params.update_available_operations_by_preset(self.train_data)
if self.params.api_params['preset'] != 'auto':
self.params.update_available_operations_by_preset(self.train_data)
self.params.api_params['train_data'] = self.train_data
if predefined_model is not None:
......@@ -183,6 +190,9 @@ class Fedot:
self.current_pipeline, self.best_models, self.history = \
self.api_composer.obtain_model(**self.params.api_params)
if self.current_pipeline is None:
raise ValueError('No models were found')
# Final fit for obtained pipeline on full dataset
if self.history and not self.history.is_empty() or not self.current_pipeline.is_fitted:
self._train_pipeline_on_full_dataset(recommendations, full_train_not_preprocessed)
......
......@@ -3,7 +3,7 @@ from contextlib import contextmanager
from typing import Optional
from fedot.core.constants import COMPOSING_TUNING_PROPORTION, MINIMAL_PIPELINE_NUMBER_FOR_EVALUATION, \
MINIMAL_SECONDS_FOR_TUNING
MINIMAL_SECONDS_FOR_TUNING, MIN_NUMBER_OF_GENERATIONS
class ApiTime:
......@@ -38,7 +38,8 @@ class ApiTime:
def have_time_for_composing(self, pop_size: int, n_jobs: int) -> bool:
timeout_not_set = self.timedelta_composing is None
return timeout_not_set or self.assumption_fit_spend_time < self.timedelta_composing * n_jobs / pop_size
return timeout_not_set or self.assumption_fit_spend_time < \
self.timedelta_composing * n_jobs / (pop_size * MIN_NUMBER_OF_GENERATIONS)
def have_time_for_the_best_quality(self, n_jobs: int):
timeout_not_set = self.timedelta_automl is None
......
......@@ -4,7 +4,6 @@ from fedot.core.caching.base_cache import BaseCache
from fedot.core.caching.pipelines_cache_db import OperationsCacheDB
from fedot.core.pipelines.node import Node
from fedot.core.utilities.data_structures import ensure_wrapped_in_sequence
from fedot.utilities.debug import is_test_session
if TYPE_CHECKING:
from fedot.core.pipelines.pipeline import Pipeline
......@@ -34,9 +33,7 @@ class OperationsCache(BaseCache):
]
self._db.add_operations(mapped)
except Exception as ex:
self.log.warning(f'Nodes can not be saved: {ex}. Continue')
if is_test_session():
raise ex
self.log.warning(f'Nodes can not be saved: {ex}. Continue', raise_if_test=True)
def save_pipeline(self, pipeline: 'Pipeline', fold_id: Optional[int] = None):
"""
......@@ -62,9 +59,7 @@ class OperationsCache(BaseCache):
else:
nodes_lst[idx].fitted_operation = None
except Exception as ex:
self.log.warning(f'Cache can not be loaded: {ex}. Continue.')
if is_test_session():
raise ex
self.log.warning(f'Cache can not be loaded: {ex}. Continue.', raise_if_test=True)
def try_load_into_pipeline(self, pipeline: 'Pipeline', fold_id: Optional[int] = None):
"""
......
......@@ -2,7 +2,6 @@ from typing import TYPE_CHECKING, Optional, Union
from fedot.core.caching.base_cache import BaseCache
from fedot.core.caching.preprocessing_cache_db import PreprocessingCacheDB
from fedot.utilities.debug import is_test_session
if TYPE_CHECKING:
from fedot.core.pipelines.pipeline import Pipeline
......@@ -32,9 +31,7 @@ class PreprocessingCache(BaseCache):
if processors:
pipeline.encoder, pipeline.imputer = processors
except Exception as ex:
self.log.warning(f'Preprocessor search error: {ex}')
if is_test_session():
raise ex
self.log.warning(f'Preprocessor search error: {ex}', raise_if_test=True)
def add_preprocessor(self, pipeline: 'Pipeline', fold_id: Optional[Union[int, None]] = None):
"""
......
......@@ -71,11 +71,14 @@ class ComposerBuilder:
return self
def with_optimizer_params(self, parameters: Optional[GraphOptimizerParameters] = None,
external_parameters: Optional[Dict] = None):
external_parameters: Optional[Dict] = None,
dispatcher=None):
if parameters is not None:
self.optimizer_parameters = parameters
if external_parameters is not None:
self.optimizer_external_parameters = external_parameters
if dispatcher is not None:
self.optimizer_parameters = dispatcher
return self
def with_requirements(self, requirements: PipelineComposerRequirements):
......
......@@ -39,9 +39,16 @@ class GPComposer(Composer):
data_producer = DataSourceSplitter(self.composer_requirements.cv_folds,
self.composer_requirements.validation_blocks,
shuffle=True).build(data)
parallelization_mode = self.composer_requirements.parallelization_mode
if parallelization_mode == 'populational':
n_jobs_for_evaluation = 1
elif parallelization_mode == 'sequential':
n_jobs_for_evaluation = self.composer_requirements.n_jobs
else:
raise ValueError(f'Unknown parallelization_mode: {parallelization_mode}')
# Define objective function
n_jobs_for_evaluation = 1
# TODO implement dispatcher selection
objective_evaluator = PipelineObjectiveEvaluate(self.optimizer.objective, data_producer,
self.composer_requirements.max_pipeline_fit_time,
self.composer_requirements.validation_blocks,
......@@ -60,10 +67,13 @@ class GPComposer(Composer):
self.log.info('GP composition finished')
return best_model
def _convert_opt_results_to_pipeline(self, opt_result: Sequence[OptGraph]) -> Tuple[Pipeline, Sequence[Pipeline]]:
def _convert_opt_results_to_pipeline(self, opt_result: Sequence[OptGraph]) -> Tuple[
Optional[Pipeline], Sequence[Pipeline]]:
adapter = self.optimizer.graph_generation_params.adapter
multi_objective = self.optimizer.objective.is_multi_objective
best_pipelines = [adapter.restore(graph) for graph in opt_result]
if not best_pipelines:
return None, []
chosen_best_pipeline = best_pipelines if multi_objective else best_pipelines[0]
return chosen_best_pipeline, best_pipelines
......
......@@ -7,10 +7,10 @@ from sklearn.metrics import (accuracy_score, f1_score, log_loss, mean_absolute_e
silhouette_score, roc_curve, auc)
from fedot.core.data.data import InputData, OutputData
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.ts_wrappers import in_sample_ts_forecast
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.tasks import TaskTypesEnum
from fedot.core.pipelines.ts_wrappers import in_sample_ts_forecast
from fedot.core.pipelines.pipeline import Pipeline
def from_maximised_metric(metric_func):
......@@ -70,8 +70,7 @@ class QualityMetric:
reference_data, results = cls._in_sample_prediction(pipeline, reference_data, validation_blocks)
metric = cls.metric(reference_data, results)
except Exception as ex:
# TODO: use log instead of stdout
print(f'Metric evaluation error: {ex}')
pipeline.log.info(f'Metric can not be evaluated because of: {ex}')
return metric
......
......@@ -13,6 +13,7 @@ FAST_TRAIN_PRESET_NAME = 'fast_train'
AUTO_PRESET_NAME = 'auto'
MINIMAL_PIPELINE_NUMBER_FOR_EVALUATION = 100
MIN_NUMBER_OF_GENERATIONS = 3
FRACTION_OF_UNIQUE_VALUES = 0.95
......
......@@ -9,6 +9,7 @@ from typing import Optional, Tuple, Union
from fedot.core.utilities.singleton_meta import SingletonMeta
from fedot.core.utils import default_fedot_data_dir
from fedot.utilities.debug import is_test_session
DEFAULT_LOG_PATH = pathlib.Path(default_fedot_data_dir(), 'log.log')
......@@ -171,13 +172,46 @@ class LoggerAdapter(logging.LoggerAdapter):
self.logger.setLevel(self.logging_level)
return '%s - %s' % (self.extra['prefix'], msg), kwargs
def message(self, message: str):
def debug(self, msg, *args, **kwargs):
raise_if_test(msg, **kwargs)
super().debug(msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
raise_if_test(msg, **kwargs)
super().info(msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
raise_if_test(msg, **kwargs)
super().warning(msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
raise_if_test(msg, **kwargs)
super().error(msg, *args, **kwargs)
def exception(self, msg, *args, exc_info=True, **kwargs):
raise_if_test(msg, **kwargs)
super().exception(msg, *args, **kwargs)
def critical(self, msg, *args, **kwargs):
raise_if_test(msg, **kwargs)
super().critical(msg, *args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
"""
raise_if_test(msg, **kwargs)
super().log(level, msg, *args, **kwargs)
def message(self, msg: str, **kwargs):
""" Record the message to user.
Message is an intermediate logging level between info and warning
to display main info about optimization process """
raise_if_test(msg, **kwargs)
message_logging_level = 45
if message_logging_level >= self.logging_level:
self.critical(msg=message)
self.critical(msg=msg)
def __str__(self):
return f'LoggerAdapter object for {self.extra["prefix"]} module'
......@@ -186,6 +220,11 @@ class LoggerAdapter(logging.LoggerAdapter):
return self.__str__()
def raise_if_test(msg, **kwargs):
if kwargs.get('raise_if_test', False) is True and is_test_session:
raise Exception(msg)
def default_log(prefix: Optional[object] = 'default') -> 'LoggerAdapter':
""" Default logger
......
......@@ -46,6 +46,8 @@ class HallOfFame:
:param population: A list of individual with a fitness attribute to
update the hall of fame with.
"""
if not population:
return
for ind in population:
if len(self) == 0 and self.maxsize != 0:
# Working on an empty hall of fame is problematic for the loop
......
......@@ -40,8 +40,11 @@ class ComposerRequirements:
early_stopping_iterations: Optional[int] = 10
early_stopping_timeout: Optional[float] = 5
keep_n_best: int = 1
max_pipeline_fit_time: Optional[datetime.timedelta] = None
n_jobs: int = 1
n_jobs: int = -1
parallelization_mode: str = 'populational'
show_progress: bool = True
collect_intermediate_metric: bool = False
......
......@@ -29,6 +29,7 @@ G = TypeVar('G', bound=Serializable)
class DelegateEvaluator:
"""Interface for delegate evaluator of graphs."""
@property
@abstractmethod
def is_enabled(self) -> bool:
......@@ -96,18 +97,17 @@ class ObjectiveEvaluationDispatcher(ABC):
return individuals_evaluated
class MultiprocessingDispatcher(ObjectiveEvaluationDispatcher):
"""Evaluates objective function on population using multiprocessing pool
and optionally model evaluation cache with RemoteEvaluator.
class BasePipelineEvaluationDispatcher(ObjectiveEvaluationDispatcher):
"""Base class for disptchers that evaluate objective function on population.
Usage: call `dispatch(objective_function)` to get evaluation function.
Usage: call `dispatch(objective_function)` to get evaluation function.
Args:
adapter: adapter for graphs
n_jobs: number of jobs for multiprocessing or 1 for no multiprocessing.
graph_cleanup_fn: function to call after graph evaluation, primarily for memory cleanup.
delegate_evaluator: delegate graph fitter (e.g. for remote graph fitting before evaluation)
"""
Args:
adapter: adapter for graphs
n_jobs: number of jobs for multiprocessing or 1 for no multiprocessing.
graph_cleanup_fn: function to call after graph evaluation, primarily for memory cleanup.
delegate_evaluator: delegate graph fitter (e.g. for remote graph fitting before evaluation)
"""
def __init__(self,
adapter: BaseOptimizationAdapter,
......@@ -123,46 +123,25 @@ class MultiprocessingDispatcher(ObjectiveEvaluationDispatcher):
self.timer = None
self.logger = default_log(self)
self._n_jobs = n_jobs
self._reset_eval_cache()
self.evaluation_cache = None
def dispatch(self, objective: ObjectiveFunction, timer: Optional[Timer] = None) -> EvaluationOperator:
"""Return handler to this object that hides all details
and allows only to evaluate population with provided objective."""
self._objective_eval = objective
self.timer = timer or get_forever_timer()
return self.evaluate_with_cache
return self.evaluate_population
def set_evaluation_callback(self, callback: Optional[GraphFunction]):
self._post_eval_callback = callback
def evaluate_with_cache(self, population: PopulationT) -> Optional[PopulationT]:
reversed_population = list(reversed(population))
self._remote_compute_cache(reversed_population)
evaluated_population = self.evaluate_population(reversed_population)
self._reset_eval_cache()
return evaluated_population
@abstractmethod
def evaluate_population(self, individuals: PopulationT) -> Optional[PopulationT]:
individuals_to_evaluate, individuals_to_skip = self.split_individuals_to_evaluate(individuals)
# Evaluate individuals without valid fitness in parallel.
n_jobs = determine_n_jobs(self._n_jobs, self.logger)
parallel = Parallel(n_jobs=n_jobs, verbose=0, pre_dispatch="2*n_jobs")
eval_func = partial(self.evaluate_single, logs_initializer=Log().get_parameters())
evaluation_results = parallel(delayed(eval_func)(ind.graph, ind.uid) for ind in individuals_to_evaluate)
individuals_evaluated = self.apply_evaluation_results(individuals_to_evaluate, evaluation_results)
# If there were no successful evals then try once again getting at least one,
# even if time limit was reached
successful_evals = individuals_evaluated + individuals_to_skip
if not successful_evals:
single_ind = choice(individuals)
evaluation_result = eval_func(single_ind.graph, single_ind.uid, with_time_limit=False)
successful_evals = self.apply_evaluation_results([single_ind], [evaluation_result]) or None
MemoryAnalytics.log(self.logger,
additional_info='parallel evaluation of population',
logging_level=logging.INFO)
return successful_evals
raise NotImplementedError()
def evaluate_single(self, graph: OptGraph, uid_of_individual: str, with_time_limit: bool = True, cache_key: Optional[str] = None,
def evaluate_single(self, graph: OptGraph, uid_of_individual: str, with_time_limit: bool = True,
cache_key: Optional[str] = None,
logs_initializer: Optional[Tuple[int, pathlib.Path]] = None) -> OptionalEvalResult:
if with_time_limit and self.timer.is_time_limit_reached():
......@@ -171,8 +150,6 @@ class MultiprocessingDispatcher(ObjectiveEvaluationDispatcher):
# in case of multiprocessing run
Log.setup_in_mp(*logs_initializer)
graph = self.evaluation_cache.get(cache_key, graph)
adapted_evaluate = self._adapter.adapt_func(self._evaluate_graph)
start_time = timeit.default_timer()
fitness, graph = adapted_evaluate(graph)
......@@ -198,6 +175,72 @@ class MultiprocessingDispatcher(ObjectiveEvaluationDispatcher):
return fitness, domain_graph
class MultiprocessingDispatcher(BasePipelineEvaluationDispatcher):
"""Evaluates objective function on population using multiprocessing pool
and optionally model evaluation cache with RemoteEvaluator.
Usage: call `dispatch(objective_function)` to get evaluation function.
Args:
adapter: adapter for graphs
n_jobs: number of jobs for multiprocessing or 1 for no multiprocessing.
graph_cleanup_fn: function to call after graph evaluation, primarily for memory cleanup.
delegate_evaluator: delegate graph fitter (e.g. for remote graph fitting before evaluation)
"""
def __init__(self,
adapter: BaseOptimizationAdapter,
n_jobs: int = 1,
graph_cleanup_fn: Optional[GraphFunction] = None,
delegate_evaluator: Optional[DelegateEvaluator] = None):
super().__init__(adapter, n_jobs, graph_cleanup_fn, delegate_evaluator)
self._reset_eval_cache()
def dispatch(self, objective: ObjectiveFunction, timer: Optional[Timer] = None) -> EvaluationOperator:
"""Return handler to this object that hides all details
and allows only to evaluate population with provided objective."""
super().dispatch(objective, timer)
return self.evaluate_with_cache
def evaluate_with_cache(self, population: PopulationT) -> Optional[PopulationT]:
reversed_population = list(reversed(population))
self._remote_compute_cache(reversed_population)
evaluated_population = self.evaluate_population(reversed_population)
self._reset_eval_cache()
return evaluated_population
def evaluate_population(self, individuals: PopulationT) -> Optional[PopulationT]:
individuals_to_evaluate, individuals_to_skip = self.split_individuals_to_evaluate(individuals)
# Evaluate individuals without valid fitness in parallel.
n_jobs = determine_n_jobs(self._n_jobs, self.logger)
parallel = Parallel(n_jobs=n_jobs, verbose=0, pre_dispatch="2*n_jobs")
eval_func = partial(self.evaluate_single, logs_initializer=Log().get_parameters())
evaluation_results = parallel(delayed(eval_func)(ind.graph, ind.uid) for ind in individuals_to_evaluate)
individuals_evaluated = self.apply_evaluation_results(individuals_to_evaluate, evaluation_results)
# If there were no successful evals then try once again getting at least one,
# even if time limit was reached
successful_evals = individuals_evaluated + individuals_to_skip
if not successful_evals:
single_ind = choice(individuals)
evaluation_result = eval_func(single_ind.graph, single_ind.uid, with_time_limit=False)
successful_evals = self.apply_evaluation_results([single_ind], [evaluation_result]) or None
MemoryAnalytics.log(self.logger,
additional_info='parallel evaluation of population',
logging_level=logging.INFO)
return successful_evals
def evaluate_single(self, graph: OptGraph, uid_of_individual: str, with_time_limit: bool = True,
cache_key: Optional[str] = None,
logs_initializer: Optional[Tuple[int, pathlib.Path]] = None) -> OptionalEvalResult:
graph = self.evaluation_cache.get(cache_key, graph)
eval_res = super().evaluate_single(graph, uid_of_individual, with_time_limit, cache_key, logs_initializer)
return eval_res
def _reset_eval_cache(self):
self.evaluation_cache: Dict[str, Graph] = {}
......@@ -210,53 +253,22 @@ class MultiprocessingDispatcher(ObjectiveEvaluationDispatcher):
self.evaluation_cache = {ind.uid: graph for ind, graph in zip(population, computed_graphs)}
class SimpleDispatcher(ObjectiveEvaluationDispatcher):
"""Evaluates objective function on population.
class SequentialDispatcher(MultiprocessingDispatcher):
"""Evaluates objective function on population in sequential way.
Usage: call `dispatch(objective_function)` to get evaluation function.
Usage: call `dispatch(objective_function)` to get evaluation function.
"""
def __init__(self, adapter: BaseOptimizationAdapter):
self._adapter = adapter
self._objective_eval = None
self.timer = None
def dispatch(self, objective: ObjectiveFunction, timer: Optional[Timer] = None) -> EvaluationOperator:
"""Return handler to this object that hides all details
and allows only to evaluate population with provided objective."""
self._objective_eval = objective
self.timer = timer or get_forever_timer()
return self.evaluate_population
def evaluate_population(self, individuals: PopulationT) -> Optional[PopulationT]:
individuals_to_evaluate, individuals_to_skip = self.split_individuals_to_evaluate(individuals)
evaluation_results = [self.evaluate_single(ind.graph, ind.uid) for ind in individuals_to_evaluate]
individuals_evaluated = self.apply_evaluation_results(individuals_to_evaluate, evaluation_results)
evaluated_population = individuals_evaluated + individuals_to_skip or None
return evaluated_population
def evaluate_single(self, graph: OptGraph, uid_of_individual: str, with_time_limit=True) -> OptionalEvalResult:
if with_time_limit and self.timer.is_time_limit_reached():
return None
adapted_evaluate = self._adapter.adapt_func(self._evaluate_graph)
start_time = timeit.default_timer()
fitness, graph = adapted_evaluate(graph)
end_time = timeit.default_timer()
eval_time_iso = datetime.now().isoformat()
eval_res = GraphEvalResult(
uid_of_individual=uid_of_individual, fitness=fitness, graph=graph, metadata={
'computation_time_in_seconds': end_time - start_time,
'evaluation_time_iso': eval_time_iso
}
)
return eval_res
def _evaluate_graph(self, graph: Graph) -> Tuple[Fitness, Graph]:
fitness = self._objective_eval(graph)
return fitness, graph
MemoryAnalytics.log(self.logger,
additional_info='sequential evaluation of population',
logging_level=logging.INFO)
return evaluated_population
def determine_n_jobs(n_jobs=-1, logger=None):
......
......@@ -62,14 +62,14 @@ class PipelineObjectiveEvaluate(ObjectiveEvaluate[Pipeline]):
try:
prepared_pipeline = self.prepare_graph(graph, train_data, fold_id, self._eval_n_jobs)
except Exception as ex:
self._log.warning(f'Error on Pipeline fit during fitness evaluation. '
f'Skipping the Pipeline. Error <{ex}> on graph: {graph_id}')
self._log.warning(f'Unsuccessful pipeline fit during fitness evaluation. '
f'Skipping the pipeline. Exception <{ex}> on {graph_id}')
if is_test_session() and not isinstance(ex, TimeoutError):
stack_trace = traceback.format_exc()
save_debug_info_for_pipeline(graph, train_data, test_data, ex, stack_trace)
if not is_recording_mode():
raise ex
continue
break # if even one fold fails, the evaluation stops
evaluated_fitness = self._objective(prepared_pipeline,
reference_data=test_data,
......@@ -78,11 +78,7 @@ class PipelineObjectiveEvaluate(ObjectiveEvaluate[Pipeline]):
folds_metrics.append(evaluated_fitness.values)
else:
self._log.warning(f'Invalid fitness after objective evaluation. '
f'Skipping the graph: {graph_id}')
if is_test_session():
raise ValueError(f'Fitness {evaluated_fitness} is not valid')
else:
continue
f'Skipping the graph: {graph_id}', raise_if_test=True)
graph.unfit()
if folds_metrics:
folds_metrics = tuple(np.mean(folds_metrics, axis=0)) # averages for each metric over folds
......
......@@ -115,14 +115,14 @@ class Individual:
return self.uid == other.uid
def __copy__(self):
default_log(self).warning(INDIVIDUAL_COPY_RESTRICTION_MESSAGE)
default_log(self).warning(INDIVIDUAL_COPY_RESTRICTION_MESSAGE, raise_if_test=True)
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
return result
def __deepcopy__(self, memo):
default_log(self).warning(INDIVIDUAL_COPY_RESTRICTION_MESSAGE)
default_log(self).warning(INDIVIDUAL_COPY_RESTRICTION_MESSAGE, raise_if_test=True)
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment