Unanswered
I’M Trying To Understand The Execution Flow Of Pipelines When Translating From Local To Remote Execution. I’Ve Defined A Pipeline Using The
import json
import os
import sys
from argparse import ArgumentParser
from logging import getLogger
from pathlib import Path
from typing import Callable
from clearml import PipelineDecorator, Task
from clearml_pipelines_examples.base.pipeline_settings import ExecutionMode
from clearml_pipelines_examples.pipelines.examples.train_model_on_random_data import (
TrainModelPipelineKwargs,
TrainModelPipelineSettings,
)
from clearml_pipelines_examples.tasks.examples import generate_dataset, train_model
logger = getLogger()
def build_pipeline(pipeline_config: TrainModelPipelineSettings) -> Callable:
generate_dataset_config = pipeline_config.components.generate_dataset
generate_dataset_step = PipelineDecorator.component(
generate_dataset,
task_type="data_processing",
return_values=["n_data_features"],
**generate_dataset_config.dict(),
)
train_model_config = pipeline_config.components.train_model
train_model_step = PipelineDecorator.component(
train_model,
task_type="training",
parents=["generate_dataset"],
**train_model_config.dict(),
)
@PipelineDecorator.pipeline(
start_controller_locally=pipeline_config.execution_mode != ExecutionMode.all_remote,
**pipeline_config.dict(exclude={"execution_mode", "components"}),
)
def pipeline(
clearml_dataset_name: str,
clearml_dataset_project_name: str,
clearml_model_name: str,
generate_n_samples: int = 100,
generate_n_features: int = 1,
distribution_type: str = "uniform",
epochs: int = 1,
):
n_data_features = generate_dataset_step(
clearml_dataset_name=clearml_dataset_name,
clearml_dataset_project_name=clearml_dataset_project_name,
n_samples=generate_n_samples,
n_features=generate_n_features,
dist_type=distribution_type,
)
train_model_step(
clearml_dataset_name=clearml_dataset_name,
clearml_dataset_project_name=clearml_dataset_project_name,
clearml_model_name=clearml_model_name,
epochs=epochs,
n_dataset_features=n_data_features,
)
return pipeline
if __name__ == "__main__":
# mbarna/TODO: move this parsing to a separate function
parser = ArgumentParser()
# mbarna/TODO: would be good to have this not end up in the pipeline UI config
parser.add_argument("--config-file-path", "-f", type=Path)
parsed_args = parser.parse_args()
pipeline_settings = None
pipeline_kwargs = None
# use this to detect running in a remote context
if task_id := os.getenv("CLEARML_TASK_ID"):
logger.info(f"ClearML remote task: {task_id}")
task = Task.get_task(task_id)
pipeline_ui_config = task.get_parameters_as_dict()
pipeline = PipelineDecorator.get_current_pipeline()
pipeline(**pipeline_ui_config)
sys.exit(0)
# when running locally, load config file and get pipeline ClearML Settings
# and pipeline function kwarg values
elif config_file_path := parsed_args.config_file_path:
raw_config = json.loads(config_file_path.read_text())
pipeline_settings = TrainModelPipelineSettings.parse_obj(raw_config["settings"])
pipeline_kwargs = TrainModelPipelineKwargs.parse_obj(raw_config["pipeline_kwargs"])
else:
pipeline_settings = TrainModelPipelineSettings()
pipeline_kwargs = TrainModelPipelineKwargs()
logger.info(
f"ClearML Configuration: {pipeline_settings if pipeline_settings else '<remote execution>'}"
)
logger.info(f"Pipeline Configuration: {pipeline_kwargs}.")
if pipeline_settings.execution_mode == ExecutionMode.all_local_single_process:
PipelineDecorator.debug_pipeline()
elif pipeline_settings.execution_mode == ExecutionMode.local_controller_subprocess_tasks:
PipelineDecorator.run_locally()
pipeline = build_pipeline(pipeline_settings)
# start pipeline with values from config
pipeline(**pipeline_kwargs.dict())
96 Views
0
Answers
8 months ago
8 months ago