Unverified Commit 65cd5c97 authored by Pakulin Sergei's avatar Pakulin Sergei Committed by GitHub
Browse files

+ main API input preprocessing activation param (#1025)

+ pipeline's new parameter and preprocessing class
+ pipeline's integrational tests for the new parameter
+ split preprocessing status param in supplementary_data into obligatory and optional
+ main api's integrational test for the new parameter
+ type/style/logic fixes
Showing with 202 additions and 144 deletions
+202 -144
......@@ -80,13 +80,14 @@ class ApiComposer:
# Start composing - pipeline structure search
return self.compose_fedot_model(api_params_dict, composer_params_dict, tuner_params_dict)
def init_cache(self, use_pipelines_cache: bool, use_preprocessing_cache: bool,
def init_cache(self, use_pipelines_cache: bool = True,
use_input_preprocessing: bool = True, use_preprocessing_cache: bool = True,
cache_folder: Optional[Union[str, os.PathLike]] = None):
if use_pipelines_cache:
self.pipelines_cache = OperationsCache(cache_folder)
# in case of previously generated singleton cache
self.pipelines_cache.reset()
if use_preprocessing_cache:
if use_input_preprocessing and use_preprocessing_cache:
self.preprocessing_cache = PreprocessingCache(cache_folder)
# in case of previously generated singleton cache
self.preprocessing_cache.reset()
......@@ -125,6 +126,10 @@ class ApiComposer:
max_pipeline_fit_time=max_pipeline_fit_time,
n_jobs=api_params['n_jobs'],
parallelization_mode=api_params['parallelization_mode'],
static_individual_metadata={
k: v for k, v in composer_params.items()
if k in ['use_input_preprocessing']
},
show_progress=api_params['show_progress'],
collect_intermediate_metric=composer_params['collect_intermediate_metric'],
keep_n_best=composer_params['keep_n_best'],
......@@ -190,6 +195,10 @@ class ApiComposer:
self.tuner_requirements = PipelineComposerRequirements(
n_jobs=api_params['n_jobs'],
static_individual_metadata={
k: v for k, v in composer_params.items()
if k in ['use_input_preprocessing']
},
cv_folds=composer_params['cv_folds'],
validation_blocks=composer_params['validation_blocks'],
)
......@@ -210,7 +219,9 @@ class ApiComposer:
assumption_handler = AssumptionsHandler(train_data)
initial_assumption = assumption_handler.propose_assumptions(composer_params['initial_assumption'],
available_operations)
available_operations,
use_input_preprocessing=composer_params[
'use_input_preprocessing'])
n_jobs = determine_n_jobs(api_params['n_jobs'])
......@@ -365,7 +376,7 @@ def _divide_parameters(common_dict: dict) -> List[dict]:
optimizer_external_params=None, collect_intermediate_metric=False,
max_pipeline_fit_time=None, initial_assumption=None, preset='auto',
use_pipelines_cache=True, use_preprocessing_cache=True, cache_folder=None,
keep_history=True, history_dir=None)
keep_history=True, history_dir=None, use_input_preprocessing=True)
tuner_params_dict = dict(with_tuning=False)
......
......@@ -10,6 +10,7 @@ from fedot.core.data.multi_modal import MultiModalData
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.ts_wrappers import in_sample_ts_forecast, convert_forecast_to_output
from fedot.core.repository.tasks import Task, TaskTypesEnum
from fedot.preprocessing.dummy_preprocessing import DummyPreprocessor
from fedot.preprocessing.preprocessing import DataPreprocessor
......@@ -25,14 +26,18 @@ class ApiDataProcessor:
Data preprocessing such a class performing also
"""
def __init__(self, task: Task):
def __init__(self, task: Task, use_input_preprocessing: bool = True):
self.task = task
self.preprocessor = DataPreprocessor()
# Dictionary with recommendations (e.g. 'cut' for cutting dataset, 'label_encode'
# to encode features using label encoder). Parameters for transformation provided also
self.recommendations = {'cut': self.preprocessor.cut_dataset,
'label_encoded': self.preprocessor.label_encoding_for_fit}
self._recommendations = {}
self.preprocessor = DummyPreprocessor()
if use_input_preprocessing:
self.preprocessor = DataPreprocessor()
# Dictionary with recommendations (e.g. 'cut' for cutting dataset, 'label_encoded'
# to encode features using label encoder). Parameters for transformation provided also
self._recommendations = {'cut': self.preprocessor.cut_dataset,
'label_encoded': self.preprocessor.label_encoding_for_fit}
def define_data(self,
features: FeaturesType,
......@@ -115,7 +120,6 @@ class ApiDataProcessor:
for data_source_name, values in input_data.items():
self.accept_and_apply_recommendations(input_data[data_source_name], recommendations[data_source_name])
else:
for name in recommendations:
rec = recommendations[name]
for name, rec in recommendations.items():
# Apply desired preprocessing function
self.recommendations[name](input_data, *rec.values())
self._recommendations[name](input_data, *rec.values())
from typing import List, Union, Optional, Set
from typing import List, Union, Optional, Set, Tuple
from fedot.api.api_utils.assumptions.operations_filter import OperationsFilter, WhitelistOperationsFilter
from fedot.api.api_utils.assumptions.preprocessing_builder import PreprocessingBuilder
from fedot.api.api_utils.assumptions.task_assumptions import TaskAssumptions
from fedot.core.log import default_log
from fedot.core.data.data import InputData
from fedot.core.data.multi_modal import MultiModalData
from fedot.core.log import default_log
from fedot.core.pipelines.node import Node
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.operation_types_repository import OperationTypesRepository
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.pipeline_builder import PipelineBuilder
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.operation_types_repository import OperationTypesRepository
class AssumptionsBuilder:
......@@ -41,11 +41,15 @@ class AssumptionsBuilder:
def from_operations(self, available_operations: List[str]):
raise NotImplementedError('abstract')
def to_builders(self, initial_node: Optional[Node] = None) -> List[PipelineBuilder]:
def to_builders(self, initial_node: Optional[Node] = None,
use_input_preprocessing: bool = True) -> List[PipelineBuilder]:
raise NotImplementedError('abstract')
def build(self, initial_node: Optional[Node] = None) -> List[Pipeline]:
return [builder.build() for builder in self.to_builders(initial_node)]
def build(self, initial_node: Optional[Node] = None,
use_input_preprocessing: bool = True) -> List[Pipeline]:
return [
builder.build()
for builder in self.to_builders(initial_node, use_input_preprocessing=use_input_preprocessing)]
class UniModalAssumptionsBuilder(AssumptionsBuilder):
......@@ -76,11 +80,13 @@ class UniModalAssumptionsBuilder(AssumptionsBuilder):
self.logger.info(self.UNSUITABLE_AVAILABLE_OPERATIONS_MSG)
return self
def to_builders(self, initial_node: Optional[Node] = None) -> List[PipelineBuilder]:
def to_builders(self, initial_node: Optional[Node] = None,
use_input_preprocessing: bool = True) -> List[PipelineBuilder]:
""" Return a list of valid builders satisfying internal
OperationsFilter or a single fallback builder. """
preprocessing = \
PreprocessingBuilder.builder_for_data(self.data.task.task_type, self.data, initial_node)
PreprocessingBuilder.builder_for_data(self.data.task.task_type, self.data, initial_node,
use_input_preprocessing=use_input_preprocessing)
valid_builders = []
for processing in self.assumptions_generator.processing_builders():
candidate_builder = preprocessing.merge_with(processing)
......@@ -93,24 +99,28 @@ class MultiModalAssumptionsBuilder(AssumptionsBuilder):
def __init__(self, data: MultiModalData, repository_name: str = "model"):
super().__init__(data, repository_name)
_subbuilders = []
for data_type, (data_source_name, values) in zip(self.data.data_type, self.data.items()):
for data_type, (data_source_name, _) in zip(self.data.data_type, self.data.items()):
_subbuilders.append((data_source_name, UniModalAssumptionsBuilder(self.data, data_type)))
self._subbuilders = tuple(_subbuilders)
self._subbuilders: Tuple[Tuple[str, UniModalAssumptionsBuilder]] = tuple(_subbuilders)
def from_operations(self, available_operations: Optional[List[str]] = None):
for data_source, subbuilder in self._subbuilders:
for _, subbuilder in self._subbuilders:
# Performs specific filter on image data operations
if subbuilder.data_type is DataTypesEnum.image:
available_img_operations = ['data_source_img', 'cnn']
subbuilder.from_operations(available_img_operations)
return self
def to_builders(self, initial_node: Optional[Node] = None) -> List[PipelineBuilder]:
def to_builders(self, initial_node: Optional[Node] = None,
use_input_preprocessing: bool = True) -> List[PipelineBuilder]:
# For each data source build its own list of alternatives of initial pipelines.
subpipelines: List[List[Pipeline]] = []
initial_node_operation = initial_node.operation.operation_type if initial_node is not None else None
for data_source_name, subbuilder in self._subbuilders:
first_node = PipelineBuilder().add_node(data_source_name).add_node(initial_node).to_nodes()[0]
data_pipeline_alternatives = subbuilder.build(first_node)
first_node = \
PipelineBuilder(use_input_preprocessing=use_input_preprocessing) \
.add_node(data_source_name).add_node(initial_node_operation).to_nodes()[0]
data_pipeline_alternatives = subbuilder.build(first_node, use_input_preprocessing=use_input_preprocessing)
subpipelines.append(data_pipeline_alternatives)
# Then zip these alternatives together and add final node to get ensembles.
......@@ -118,7 +128,8 @@ class MultiModalAssumptionsBuilder(AssumptionsBuilder):
for pre_ensemble in zip(*subpipelines):
ensemble_operation = self.assumptions_generator.ensemble_operation()
ensemble_nodes = map(lambda pipeline: pipeline.root_node, pre_ensemble)
ensemble_builder = PipelineBuilder(*ensemble_nodes).join_branches(ensemble_operation)
ensemble_builder = PipelineBuilder(*ensemble_nodes, use_input_preprocessing=use_input_preprocessing) \
.join_branches(ensemble_operation)
ensemble_builders.append(ensemble_builder)
return ensemble_builders
......
......@@ -25,19 +25,25 @@ class AssumptionsHandler:
def propose_assumptions(self,
initial_assumption: Union[List[Pipeline], Pipeline, None],
available_operations: List) -> List[Pipeline]:
available_operations: List,
use_input_preprocessing: bool = True) -> List[Pipeline]:
"""
Method to propose initial assumptions if needed
:param initial_assumption: initial assumption given by user
:param available_operations: list of available operations defined by user
Args:
initial_assumption: initial assumption given by user
available_operations: list of available operations defined by user
use_input_preprocessing: whether to do preprocessing of initial data
Returns:
list of initial assumption pipelines
"""
if initial_assumption is None:
assumptions_builder = AssumptionsBuilder \
.get(self.data) \
.from_operations(available_operations)
initial_assumption = assumptions_builder.build()
initial_assumption = assumptions_builder.build(use_input_preprocessing=use_input_preprocessing)
elif isinstance(initial_assumption, Pipeline):
initial_assumption = [initial_assumption]
return initial_assumption
......
from typing import Optional, Union
from fedot.core.data.data import InputData
from fedot.core.data.data_preprocessing import data_has_missing_values, data_has_categorical_features, \
data_has_text_features
from fedot.core.data.data_preprocessing import data_has_text_features
from fedot.core.data.multi_modal import MultiModalData
from fedot.core.pipelines.node import Node
from fedot.core.pipelines.pipeline import Pipeline
......@@ -16,22 +15,26 @@ class PreprocessingBuilder:
Builder for constructing preprocessing part of pipeline during the preparation of an initial assumption.
If data is multimodal, builder makes preprocessing pipeline for each data source iteratively.
"""
def __init__(self, task_type: TaskTypesEnum, data_type: DataTypesEnum, *initial_nodes: Node):
def __init__(self, task_type: TaskTypesEnum, data_type: DataTypesEnum, *initial_nodes: Node,
use_input_preprocessing: bool = True):
self.task_type = task_type
self.data_type = data_type
self._builder = PipelineBuilder(*initial_nodes)
self._builder = PipelineBuilder(*initial_nodes, use_input_preprocessing=use_input_preprocessing)
@classmethod
def builder_for_data(cls,
task_type: TaskTypesEnum,
data: Union[InputData, MultiModalData],
*initial_nodes: Optional[Node]) -> PipelineBuilder:
*initial_nodes: Optional[Node],
use_input_preprocessing: bool = True) -> PipelineBuilder:
if isinstance(data, MultiModalData):
# if the data is unimodal, initial_nodes = tuple of None
# if the data is multimodal, initial_nodes = tuple of 1 element (current data_source node)
# so the whole data is reduced to the current data_source for an easier preprocessing
data = data[str(initial_nodes[0])]
preprocessing_builder = cls(task_type, data.data_type, *initial_nodes)
preprocessing_builder = cls(task_type, data.data_type, *initial_nodes,
use_input_preprocessing=use_input_preprocessing)
if data_has_text_features(data):
preprocessing_builder = preprocessing_builder.with_text_vectorizer()
return preprocessing_builder.to_builder()
......@@ -50,5 +53,10 @@ class PreprocessingBuilder:
return self.with_scaling()._builder
def to_pipeline(self) -> Optional[Pipeline]:
""" Return result as Pipeline. Scaling is applied final by default. """
"""
Returns result as Pipeline. Scaling is applied final by default.
Returns:
adapted graph as pipeline
"""
return self.to_builder().build()
from typing import List
from fedot.api.api_utils.assumptions.operations_filter import OperationsFilter
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.pipeline_builder import PipelineBuilder
from fedot.core.repository.operation_types_repository import OperationTypesRepository
from fedot.core.repository.tasks import Task, TaskTypesEnum
......@@ -14,13 +13,13 @@ class TaskAssumptions:
self.repo = repository
@staticmethod
def for_task(task: Task, repository: OperationTypesRepository):
def for_task(task: Task, repository: OperationTypesRepository) -> 'TaskAssumptions':
assumptions_by_task = {
TaskTypesEnum.classification: ClassificationAssumptions,
TaskTypesEnum.regression: RegressionAssumptions,
TaskTypesEnum.ts_forecasting: TSForecastingAssumptions,
}
assumptions_cls = assumptions_by_task.get(task.task_type)
assumptions_cls: TaskAssumptions = assumptions_by_task.get(task.task_type)
if not assumptions_cls:
raise NotImplementedError(f"Don't have assumptions for task type: {task.task_type}")
return assumptions_cls(repository)
......@@ -49,26 +48,26 @@ class TSForecastingAssumptions(TaskAssumptions):
return {
'glm_ridge':
PipelineBuilder()
.add_branch('glm', 'lagged')
.add_node('ridge', branch_idx=1)
.join_branches('ridge'),
.add_branch('glm', 'lagged')
.add_node('ridge', branch_idx=1)
.join_branches('ridge'),
'lagged_ridge':
PipelineBuilder()
.add_sequence('lagged', 'ridge'),
.add_sequence('lagged', 'ridge'),
'polyfit_ridge':
PipelineBuilder()
.add_branch('polyfit', 'lagged')
.grow_branches(None, 'ridge')
.join_branches('ridge'),
.add_branch('polyfit', 'lagged')
.grow_branches(None, 'ridge')
.join_branches('ridge'),
'smoothing_ar':
PipelineBuilder()
.add_sequence('smoothing', 'ar'),
.add_sequence('smoothing', 'ar'),
}
def ensemble_operation(self) -> str:
return 'ridge'
def processing_builders(self) -> List[Pipeline]:
def processing_builders(self) -> List[PipelineBuilder]:
return list(self.builders.values())
def fallback_builder(self, operations_filter: OperationsFilter) -> PipelineBuilder:
......@@ -93,7 +92,7 @@ class RegressionAssumptions(TaskAssumptions):
def ensemble_operation(self) -> str:
return 'rfr'
def processing_builders(self) -> List[Pipeline]:
def processing_builders(self) -> List[PipelineBuilder]:
return list(self.builders.values())
def fallback_builder(self, operations_filter: OperationsFilter) -> PipelineBuilder:
......@@ -115,7 +114,7 @@ class ClassificationAssumptions(TaskAssumptions):
def ensemble_operation(self) -> str:
return 'rf'
def processing_builders(self) -> List[Pipeline]:
def processing_builders(self) -> List[PipelineBuilder]:
return list(self.builders.values())
def fallback_builder(self, operations_filter: OperationsFilter) -> PipelineBuilder:
......
......@@ -11,13 +11,13 @@ from fedot.core.data.multi_modal import MultiModalData
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.tasks import Task, TaskTypesEnum
FeaturesType = Union[str, PathLike, np.ndarray, pd.DataFrame, InputData, dict, tuple]
FeaturesType = Union[str, PathLike, np.ndarray, pd.DataFrame, InputData, MultiModalData, dict, tuple]
TargetType = Union[str, PathLike, np.ndarray, pd.Series, dict]
class DataDefiner:
def __init__(self, strategy) -> None:
def __init__(self, strategy: 'StrategyDefineData') -> None:
self._strategy = strategy
@property
......@@ -31,7 +31,7 @@ class DataDefiner:
def define_data(self, features: FeaturesType,
task: Task,
target: Optional[str] = None,
is_predict: bool = False) -> None:
is_predict: bool = False) -> Union[InputData, MultiModalData]:
return self._strategy.define_data(features,
task,
target,
......@@ -43,7 +43,7 @@ class StrategyDefineData(ABC):
def define_data(self, features: Union[tuple, str, np.ndarray, pd.DataFrame, InputData],
task: Task,
target: str = None,
is_predict: bool = False):
is_predict: bool = False) -> Union[InputData, MultiModalData]:
pass
......@@ -167,8 +167,8 @@ class MultimodalStrategy(StrategyDefineData):
return data
def data_strategy_selector(features, target, task: Task = None, is_predict: bool = None):
def data_strategy_selector(features: FeaturesType, target: Optional[str] = None, task: Task = None,
is_predict: bool = None) -> Union[InputData, MultiModalData]:
strategy = [strategy for cls, strategy in _strategy_dispatch.items() if isinstance(features, cls)][0]
data = DataDefiner(strategy())
......
......@@ -8,6 +8,7 @@ from fedot.core.constants import AUTO_PRESET_NAME, DEFAULT_FORECAST_LENGTH
from fedot.core.data.data import InputData
from fedot.core.data.multi_modal import MultiModalData
from fedot.core.log import Log, default_log
from fedot.core.log import LoggerAdapter
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.tasks import Task, TaskParams, TaskTypesEnum, TsForecastingParams
from fedot.core.utilities.random import RandomStateHandler
......@@ -16,11 +17,11 @@ from fedot.core.utilities.random import RandomStateHandler
class ApiParams:
def __init__(self):
self.api_params = None
self.log = None
self.task = None
self.task_params = None
self.metrics_name = None
self.api_params: dict = None
self.log: LoggerAdapter = None
self.task: Task = None
self.task_params: TaskParams = None
self.metric_name: Union[str, List[str]] = None
def initialize_params(self, input_params: Dict[str, Any]):
""" Merge input_params dictionary with several parameters for AutoML algorithm """
......@@ -101,7 +102,7 @@ class ApiParams:
# The minimal number of generations is 5.
if 'early_stopping_iterations' not in input_params['composer_tuner_params']:
if input_params['timeout']:
depending_on_timeout = int(input_params['timeout']/3)
depending_on_timeout = int(input_params['timeout'] / 3)
self.api_params['early_stopping_iterations'] = \
depending_on_timeout if depending_on_timeout > 5 else 5
......@@ -111,14 +112,15 @@ class ApiParams:
random.seed(specified_seed)
RandomStateHandler.MODEL_FITTING_SEED = specified_seed
if self.api_params['problem'] == 'ts_forecasting' and input_params['task_params'] is None:
self.task_params = input_params['task_params']
if self.api_params['problem'] == 'ts_forecasting' and self.task_params is None:
self.log.warning(f'The value of the forecast depth was set to {DEFAULT_FORECAST_LENGTH}.')
input_params['task_params'] = TsForecastingParams(forecast_length=DEFAULT_FORECAST_LENGTH)
self.task_params = TsForecastingParams(forecast_length=DEFAULT_FORECAST_LENGTH)
if self.api_params['problem'] == 'clustering':
raise ValueError('This type of task is not supported in API now')
self.task = self.get_task_params(self.api_params['problem'], input_params['task_params'])
self.task = ApiParams.get_task(self.api_params['problem'], self.task_params)
self.metric_name = self.get_default_metric(self.api_params['problem'])
self.api_params.pop('problem')
......@@ -135,6 +137,7 @@ class ApiParams:
'genetic_scheme': None,
'early_stopping_iterations': 30,
'early_stopping_timeout': 10,
'use_input_preprocessing': True,
'use_pipelines_cache': True,
'use_preprocessing_cache': True,
'cache_folder': None}
......@@ -158,14 +161,16 @@ class ApiParams:
return default_test_metric_dict[problem]
@staticmethod
def get_task_params(problem: str, task_params: Optional[TaskParams] = None):
""" Return task parameters by machine learning problem name (string) """
def get_task(problem: str, task_params: Optional[TaskParams] = None):
""" Return task by the given ML problem name and the parameters """
task_dict = {'regression': Task(TaskTypesEnum.regression, task_params=task_params),
'classification': Task(TaskTypesEnum.classification, task_params=task_params),
'clustering': Task(TaskTypesEnum.clustering, task_params=task_params),
'ts_forecasting': Task(TaskTypesEnum.ts_forecasting, task_params=task_params)
}
return task_dict[problem]
'ts_forecasting': Task(TaskTypesEnum.ts_forecasting, task_params=task_params)}
try:
return task_dict[problem]
except ValueError as exc:
ValueError('Wrong type name of the given task')
def check_timeout_vs_generations(api_params):
......
......@@ -6,25 +6,27 @@ from fedot.core.data.data import InputData
from fedot.core.log import Log
from fedot.core.pipelines.node import PrimaryNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.verification import verify_pipeline, verifier_for_task
from fedot.core.pipelines.verification import verify_pipeline
class PredefinedModel:
def __init__(self, predefined_model: Union[str, Pipeline], data: InputData, log: Log):
def __init__(self, predefined_model: Union[str, Pipeline], data: InputData, log: Log,
use_input_preprocessing: bool = True):
self.predefined_model = predefined_model
self.data = data
self.log = log
self.pipeline = self._get_pipeline()
self.pipeline = self._get_pipeline(use_input_preprocessing)
def _get_pipeline(self) -> Pipeline:
def _get_pipeline(self, use_input_preprocessing: bool = True) -> Pipeline:
if isinstance(self.predefined_model, Pipeline):
pipelines = self.predefined_model
elif self.predefined_model == 'auto':
# Generate initial assumption automatically
pipelines = AssumptionsBuilder.get(self.data).from_operations().build()[0]
pipelines = AssumptionsBuilder.get(self.data).from_operations().build(
use_input_preprocessing=use_input_preprocessing)[0]
elif isinstance(self.predefined_model, str):
model = PrimaryNode(self.predefined_model)
pipelines = Pipeline(model)
pipelines = Pipeline(model, use_input_preprocessing=use_input_preprocessing)
else:
raise ValueError(f'{type(self.predefined_model)} is not supported as Fedot model')
......
import logging
from copy import deepcopy
from typing import Any, List, Optional, Sequence, Tuple, Union, Callable
from typing import Any, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
......@@ -27,7 +27,7 @@ from fedot.core.utilities.data_structures import ensure_wrapped_in_sequence
from fedot.core.visualisation.opt_viz_extra import visualise_pareto
from fedot.explainability.explainer_template import Explainer
from fedot.explainability.explainers import explain_pipeline
from fedot.preprocessing.preprocessing import merge_preprocessors
from fedot.preprocessing.base_preprocessing import BasePreprocessor
from fedot.remote.remote_evaluator import RemoteEvaluator
from fedot.utilities.memory import MemoryAnalytics
from fedot.utilities.project_import_export import export_project_to_zip, import_project_from_zip
......@@ -98,6 +98,7 @@ class Fedot:
- ``ts`` -> A special preset with models for time series forecasting task.
- ``automl`` -> A special preset with only AutoML libraries such as TPOT and H2O as operations.
use_input_preprocessing: bool indicating whether to do preprocessing of further given data, enabled by default.
use_pipelines_cache: bool indicating whether to use pipeline structures caching, enabled by default.
use_preprocessing_cache: bool indicating whether to use optional preprocessors caching, enabled by default.
cache_folder: path to the place where cache files should be stored (if any cache is enabled).
......@@ -128,13 +129,16 @@ class Fedot:
self.params.initialize_params(input_params)
# Initialize ApiComposer's cache parameters via ApiParams
self.api_composer.init_cache(self.params.api_params['use_pipelines_cache'],
self.params.api_params['use_preprocessing_cache'],
self.params.api_params['cache_folder'])
self.api_composer.init_cache(use_pipelines_cache=self.params.api_params['use_pipelines_cache'],
use_input_preprocessing=self.params.api_params['use_input_preprocessing'],
use_preprocessing_cache=self.params.api_params['use_preprocessing_cache'],
cache_folder=self.params.api_params['cache_folder'])
self.tuner_requirements = None
# Initialize data processors for data preprocessing and preliminary data analysis
self.data_processor = ApiDataProcessor(task=self.params.api_params['task'])
self.data_processor = ApiDataProcessor(task=self.params.api_params['task'],
use_input_preprocessing=self.params.api_params[
'use_input_preprocessing'])
self.data_analyser = DataAnalyser(safe_mode=safe_mode)
self.target: Optional[TargetType] = None
......@@ -172,12 +176,13 @@ class Fedot:
self.target = target
self.train_data = self.data_processor.define_data(features=features, target=target, is_predict=False)
# Launch data analyser - it gives recommendations for data preprocessing
full_train_not_preprocessed = deepcopy(self.train_data)
recommendations = self.data_analyser.give_recommendation(self.train_data)
self.data_processor.accept_and_apply_recommendations(self.train_data, recommendations)
self.params.accept_and_apply_recommendations(self.train_data, recommendations)
if self.params.api_params['use_input_preprocessing']:
# Launch data analyser - it gives recommendations for data preprocessing
recommendations = self.data_analyser.give_recommendation(self.train_data)
self.data_processor.accept_and_apply_recommendations(self.train_data, recommendations)
self.params.accept_and_apply_recommendations(self.train_data, recommendations)
else:
recommendations = None
self._init_remote_if_necessary()
......@@ -192,7 +197,9 @@ class Fedot:
# Fit predefined model and return it without composing
self.current_pipeline = PredefinedModel(predefined_model,
self.train_data,
self.params.api_params['logger']).fit()
self.params.api_params['logger'],
use_input_preprocessing=self.params.api_params[
'use_input_preprocessing']).fit()
else:
self.current_pipeline, self.best_models, self.history = \
self.api_composer.obtain_model(**self.params.api_params)
......@@ -200,6 +207,7 @@ class Fedot:
if self.current_pipeline is None:
raise ValueError('No models were found')
full_train_not_preprocessed = deepcopy(self.train_data)
# Final fit for obtained pipeline on full dataset
if self.history and not self.history.is_empty() or not self.current_pipeline.is_fitted:
self._train_pipeline_on_full_dataset(recommendations, full_train_not_preprocessed)
......@@ -208,8 +216,8 @@ class Fedot:
self.params.api_params['logger'].message('Already fitted initial pipeline is used')
# Store data encoder in the pipeline if it is required
self.current_pipeline.preprocessor = merge_preprocessors(self.data_processor.preprocessor,
self.current_pipeline.preprocessor)
self.current_pipeline.preprocessor = BasePreprocessor.merge_preprocessors(
self.data_processor.preprocessor, self.current_pipeline.preprocessor)
self.params.api_params['logger'].message(f'Final pipeline: {self.current_pipeline.structure}')
......@@ -360,7 +368,7 @@ class Fedot:
self._check_forecast_applicable()
forecast_length = self.train_data.task.task_params.forecast_length
horizon = horizon if horizon is not None else forecast_length
horizon = horizon or forecast_length
if pre_history is None:
pre_history = self.train_data
pre_history.target = None
......@@ -387,7 +395,7 @@ class Fedot:
Args:
path: path to ``json`` file with model
"""
self.current_pipeline = Pipeline()
self.current_pipeline = Pipeline(use_input_preprocessing=self.params.api_params['use_input_preprocessing'])
self.current_pipeline.load(path)
self.data_processor.preprocessor = self.current_pipeline.preprocessor
......@@ -552,16 +560,17 @@ class Fedot:
if isinstance(self.target, str) and remote.remote_task_params.target is None:
remote.remote_task_params.target = self.target
def _train_pipeline_on_full_dataset(self, recommendations: dict, full_train_not_preprocessed):
def _train_pipeline_on_full_dataset(self, recommendations: Optional[dict],
full_train_not_preprocessed: Union[InputData, MultiModalData]):
"""Applies training procedure for obtained pipeline if dataset was clipped
"""
if recommendations:
if recommendations is not None:
# if data was cut we need to refit pipeline on full data
self.data_processor.accept_and_apply_recommendations(full_train_not_preprocessed,
{k: v for k, v in recommendations.items()
if k != 'cut'})
self.current_pipeline.fit(
full_train_not_preprocessed,
n_jobs=self.params.api_params['n_jobs'],
n_jobs=self.params.api_params['n_jobs']
)
......@@ -4,10 +4,10 @@ from abc import abstractmethod
from copy import deepcopy
from typing import TYPE_CHECKING, TypeVar, Generic, Type, Optional, Dict, Any, Callable, Tuple, Sequence, Union
from fedot.core.adapter.adapt_registry import AdaptRegistry
from fedot.core.dag.graph import Graph
from fedot.core.log import default_log
from fedot.core.optimisers.graph import OptGraph, OptNode
from fedot.core.adapter.adapt_registry import AdaptRegistry
from fedot.core.optimisers.opt_history_objects.individual import Individual
if TYPE_CHECKING:
......
......@@ -85,7 +85,7 @@ def boosting_mutation(pipeline: Pipeline, requirements, params, **kwargs) -> Pip
node_final = SecondaryNode(choice(requirements.secondary),
nodes_from=[existing_pipeline.root_node, node_boost])
pipeline = Pipeline(node_final)
pipeline = Pipeline(node_final, use_input_preprocessing=pipeline.use_input_preprocessing)
return pipeline
......
......@@ -3,10 +3,10 @@ from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Sequence
from networkx import graph_edit_distance, set_node_attributes
from fedot.core.dag.convert import graph_structure_as_nx_graph
from fedot.core.dag.graph import Graph
from fedot.core.dag.graph_node import GraphNode
from fedot.core.dag.graph_utils import ordered_subnodes_hierarchy, node_depth
from fedot.core.dag.convert import graph_structure_as_nx_graph
from fedot.core.utilities.data_structures import ensure_wrapped_in_sequence, Copyable, remove_items
from fedot.core.utils import copy_doc
......@@ -33,7 +33,7 @@ class LinkedGraph(Graph, Copyable):
def _empty_postprocess(*args):
pass
@copy_doc(Graph)
@copy_doc(Graph.delete_node)
def delete_node(self, node: GraphNode):
node_children_cached = self.node_children(node)
self_root_node_cached = self.root_node
......@@ -48,7 +48,7 @@ class LinkedGraph(Graph, Copyable):
self.add_node(self_root_node_cached)
self._postprocess_nodes(self, self._nodes)
@copy_doc(Graph)
@copy_doc(Graph.delete_subtree)
def delete_subtree(self, subtree: GraphNode):
subtree_nodes = ordered_subnodes_hierarchy(subtree)
self._nodes = remove_items(self._nodes, subtree_nodes)
......@@ -56,7 +56,7 @@ class LinkedGraph(Graph, Copyable):
for subtree in self._nodes:
subtree.nodes_from = remove_items(subtree.nodes_from, subtree_nodes)
@copy_doc(Graph)
@copy_doc(Graph.update_node)
def update_node(self, old_node: GraphNode, new_node: GraphNode):
self.actualise_old_node_children(old_node, new_node)
new_node.nodes_from.extend(old_node.nodes_from)
......@@ -65,7 +65,7 @@ class LinkedGraph(Graph, Copyable):
self.sort_nodes()
self._postprocess_nodes(self, self._nodes)
@copy_doc(Graph)
@copy_doc(Graph.update_subtree)
def update_subtree(self, old_subtree: GraphNode, new_subtree: GraphNode):
new_subtree = deepcopy(new_subtree)
self.actualise_old_node_children(old_subtree, new_subtree)
......@@ -73,7 +73,7 @@ class LinkedGraph(Graph, Copyable):
self.add_node(new_subtree)
self.sort_nodes()
@copy_doc(Graph)
@copy_doc(Graph.add_node)
def add_node(self, node: GraphNode):
if node not in self._nodes:
self._nodes.append(node)
......@@ -96,13 +96,13 @@ class LinkedGraph(Graph, Copyable):
if not isinstance(self.root_node, Sequence):
self._nodes = ordered_subnodes_hierarchy(self.root_node)
@copy_doc(Graph)
@copy_doc(Graph.node_children)
def node_children(self, node: GraphNode) -> List[Optional[GraphNode]]:
return [other_node for other_node in self._nodes
if other_node.nodes_from and
node in other_node.nodes_from]
@copy_doc(Graph)
@copy_doc(Graph.connect_nodes)
def connect_nodes(self, node_parent: GraphNode, node_child: GraphNode):
if node_child in self.node_children(node_parent):
return
......@@ -121,7 +121,7 @@ class LinkedGraph(Graph, Copyable):
for node in node.nodes_from:
self._clean_up_leftovers(node)
@copy_doc(Graph)
@copy_doc(Graph.disconnect_nodes)
def disconnect_nodes(self, node_parent: GraphNode, node_child: GraphNode,
clean_up_leftovers: bool = True):
if node_parent not in node_child.nodes_from:
......@@ -144,23 +144,23 @@ class LinkedGraph(Graph, Copyable):
def nodes(self, new_nodes: List[GraphNode]):
self._nodes = new_nodes
@copy_doc(Graph)
@copy_doc(Graph.__eq__)
def __eq__(self, other_graph: Graph) -> bool:
return \
set(rn.descriptive_id for rn in self.root_nodes()) == \
set(rn.descriptive_id for rn in other_graph.root_nodes())
@copy_doc(Graph)
@copy_doc(Graph.descriptive_id)
@property
def descriptive_id(self) -> str:
return ''.join([r.descriptive_id for r in self.root_nodes()])
@copy_doc(Graph)
@copy_doc(Graph.depth)
@property
def depth(self) -> int:
return 0 if not self._nodes else max(map(node_depth, self.root_nodes()))
@copy_doc(Graph)
@copy_doc(Graph.get_edges)
def get_edges(self) -> Sequence[Tuple[GraphNode, GraphNode]]:
edges = []
for node in self._nodes:
......
......@@ -19,7 +19,8 @@ class SupplementaryDataMerger:
data_flow_length=self.calculate_dataflow_len(),
features_mask=self.prepare_parent_mask(),
previous_operations=None, # is set by Node after merge
was_preprocessed=self.all_preprocessed(),
obligatorily_preprocessed=self.all_preprocessed(),
optionally_preprocessed=self.all_preprocessed(is_obligatory=False),
non_int_idx=None, # is set elsewhere (by preprocessor or during pipeline fit/predict)
column_types=self.merge_column_types()
)
......@@ -28,8 +29,11 @@ class SupplementaryDataMerger:
""" Number of visited nodes is the max number among outputs plus 1 (the next operation). """
return 1 + max(output.supplementary_data.data_flow_length for output in self.outputs)
def all_preprocessed(self) -> bool:
return all(output.supplementary_data.was_preprocessed for output in self.outputs)
def all_preprocessed(self, *, is_obligatory: bool = True) -> bool:
return all(
output.supplementary_data.obligatorily_preprocessed if is_obligatory
else output.supplementary_data.optionally_preprocessed
for output in self.outputs)
def prepare_parent_mask(self) -> Dict:
""" The method for OutputData from multiple parent nodes prepares a field
......
from __future__ import annotations
from functools import partial
from typing import List, Optional, Union
from typing import List, Optional, Union, Dict
import numpy as np
from fedot.core.data.data import InputData
from fedot.core.data.data import (process_target_and_features, array_to_input_data,
get_df_from_csv, PathType, POSSIBLE_TABULAR_IDX_KEYWORDS, POSSIBLE_TS_IDX_KEYWORDS)
from fedot.core.data.data_detection import TextDataDetector, TimeSeriesDataDetector
......@@ -12,11 +13,11 @@ from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.tasks import Task, TaskTypesEnum
class MultiModalData(dict):
class MultiModalData(Dict[str, InputData]):
""" Dictionary with InputData as values and primary node names as keys """
def __init__(self, *arg, **kw):
super(MultiModalData, self).__init__(*arg, **kw)
super().__init__(*arg, **kw)
# Check if input data contains different targets
self.contain_side_inputs = not all(value.supplementary_data.is_main_target for value in self.values())
......@@ -152,8 +153,8 @@ class MultiModalData(dict):
# add table features if they exist
if table_features.size != 0:
sources.update({'data_source_table': data_part_transformation_func
(features_array=table_features, data_type=DataTypesEnum.table)})
sources.update({'data_source_table': data_part_transformation_func(features_array=table_features,
data_type=DataTypesEnum.table)})
multi_modal_data = MultiModalData(sources)
......@@ -197,7 +198,7 @@ class MultiModalData(dict):
df = get_df_from_csv(file_path, delimiter, index_col, possible_idx_keywords, columns_to_use=columns_to_use)
idx = df.index.to_numpy()
if not columns_to_use:
columns_to_use = list(set(df.columns) - set(index_col))
columns_to_use = list(set(df.columns) - {index_col})
if is_predict:
raise NotImplementedError(
......
......@@ -21,8 +21,10 @@ class SupplementaryData:
features_mask: Optional[dict] = None
# Last visited nodes
previous_operations: Optional[list] = None
# Is there a data was preprocessed or not
was_preprocessed: bool = False
# Was the data obligatorily preprocessed before or not
obligatorily_preprocessed: bool = False
# Was the data optionally preprocessed before or not
optionally_preprocessed: bool = False
# Collection with non-int indexes
non_int_idx: Optional[list] = None
# Dictionary with features and target column types
......
......@@ -11,7 +11,6 @@ from fedot.core.pipelines.tuning.unified import PipelineTuner
from fedot.core.repository.operation_types_repository import OperationMetaInfo, \
atomized_model_type
from fedot.core.utils import make_pipeline_generator
from fedot.preprocessing.preprocessing import DataPreprocessor
class AtomizedModel(Operation):
......@@ -24,14 +23,10 @@ class AtomizedModel(Operation):
super().__init__(operation_type=atomized_model_type())
self.pipeline = pipeline
self.unique_id = self.pipeline.root_node.descriptive_id
self.atomized_preprocessor = DataPreprocessor()
def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData,
use_cache: bool = True):
def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData):
copied_input_data = deepcopy(data)
copied_input_data = self.atomized_preprocessor.obligatory_prepare_for_fit(copied_input_data)
predicted_train = self.pipeline.fit(input_data=copied_input_data)
fitted_atomized_operation = self.pipeline
......@@ -42,8 +37,6 @@ class AtomizedModel(Operation):
# Preprocessing applied
copied_input_data = deepcopy(data)
copied_input_data = self.atomized_preprocessor.obligatory_prepare_for_predict(copied_input_data)
prediction = fitted_operation.predict(input_data=copied_input_data, output_mode=output_mode)
prediction = self.assign_tabular_column_types(prediction, output_mode)
return prediction
......@@ -56,11 +49,11 @@ class AtomizedModel(Operation):
input_data: InputData = None, iterations: int = 50,
timeout: int = 5):
""" Method for tuning hyperparameters """
tuner = TunerBuilder(input_data.task)\
.with_tuner(PipelineTuner)\
.with_metric(metric_function)\
.with_iterations(iterations)\
.with_timeout(timedelta(minutes=timeout))\
tuner = TunerBuilder(input_data.task) \
.with_tuner(PipelineTuner) \
.with_metric(metric_function) \
.with_iterations(iterations) \
.with_timeout(timedelta(minutes=timeout)) \
.build(input_data)
tuned_pipeline = tuner.tune(self.pipeline)
tuned_atomized_model = AtomizedModel(tuned_pipeline)
......
......@@ -35,7 +35,7 @@ class Operation:
if isinstance(params, dict):
params = OperationParameters.from_operation_type(self.operation_type, **params)
params_for_fit = HyperparametersPreprocessor(operation_type=self.operation_type,
n_samples_data=kwargs.get('n_samples_data'))\
n_samples_data=kwargs.get('n_samples_data')) \
.correct(params.to_dict())
params_for_fit = OperationParameters.from_operation_type(self.operation_type, **params_for_fit)
try:
......@@ -136,7 +136,7 @@ class Operation:
prediction.supplementary_data.is_main_target = is_main_target
prediction.supplementary_data.data_flow_length = data_flow_length
prediction.supplementary_data.was_preprocessed = True
prediction.supplementary_data.obligatorily_preprocessed = True
return prediction
@staticmethod
......
......@@ -44,6 +44,9 @@ class ComposerRequirements:
max_pipeline_fit_time: Optional[datetime.timedelta] = None
n_jobs: int = -1
parallelization_mode: str = 'populational'
static_individual_metadata: dict = field(default_factory=lambda: {
'use_input_preprocessing': True
})
show_progress: bool = True
collect_intermediate_metric: bool = False
......
from copy import copy, deepcopy
from copy import deepcopy
from random import choice
from typing import Sequence, Callable
from fedot.core.constants import MAXIMAL_ATTEMPTS_NUMBER, EVALUATION_ATTEMPTS_NUMBER
from fedot.core.dag.graph import Graph
from fedot.core.optimisers.gp_comp.gp_params import GPGraphOptimizerParameters
from fedot.core.optimisers.gp_comp.operators.crossover import Crossover
from fedot.core.optimisers.gp_comp.operators.elitism import Elitism
......@@ -57,7 +56,8 @@ class EvoGraphOptimizer(PopulationalOptimizer):
# Define initial parameters
self.requirements.max_depth = self._graph_depth.initial
self.graph_optimizer_params.pop_size = self._pop_size.initial
self.initial_individuals = [Individual(graph) for graph in initial_graphs]
self.initial_individuals = [Individual(graph, metadata=requirements.static_individual_metadata)
for graph in initial_graphs]
def _initial_population(self, evaluator: EvaluationOperator):
""" Initializes the initial population """
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment