import json
import os
import sys
from argparse import ArgumentParser
from logging import getLogger
from pathlib import Path
from typing import Callable
from clearml import PipelineDecorator, Task
from clearml_pipelines_examples.base.pipeline_settings import ExecutionMode
from clearml_pipelines_examples.pipelines.examples.train_model_on_random_data import (
TrainModelPipelineKwargs,
TrainModelPipelineSettings,
)
from clearml_pipelines_examples.tasks.examples import generate_dataset, train_model
logger = getLogger()
def build_pipeline(pipeline_config: TrainModelPipelineSettings) -> Callable:
generate_dataset_config = pipeline_config.components.generate_dataset
generate_dataset_step = PipelineDecorator.component(
generate_dataset,
task_type="data_processing",
return_values=["n_data_features"],
**generate_dataset_config.dict(),
)
train_model_config = pipeline_config.components.train_model
train_model_step = PipelineDecorator.component(
train_model,
task_type="training",
parents=["generate_dataset"],
**train_model_config.dict(),
)
@PipelineDecorator.pipeline(
start_controller_locally=pipeline_config.execution_mode != ExecutionMode.all_remote,
**pipeline_config.dict(exclude={"execution_mode", "components"}),
)
def pipeline(
clearml_dataset_name: str,
clearml_dataset_project_name: str,
clearml_model_name: str,
generate_n_samples: int = 100,
generate_n_features: int = 1,
distribution_type: str = "uniform",
epochs: int = 1,
):
n_data_features = generate_dataset_step(
clearml_dataset_name=clearml_dataset_name,
clearml_dataset_project_name=clearml_dataset_project_name,
n_samples=generate_n_samples,
n_features=generate_n_features,
dist_type=distribution_type,
)
train_model_step(
clearml_dataset_name=clearml_dataset_name,
clearml_dataset_project_name=clearml_dataset_project_name,
clearml_model_name=clearml_model_name,
epochs=epochs,
n_dataset_features=n_data_features,
)
return pipeline
if __name__ == "__main__":
# mbarna/TODO: move this parsing to a separate function
parser = ArgumentParser()
# mbarna/TODO: would be good to have this not end up in the pipeline UI config
parser.add_argument("--config-file-path", "-f", type=Path)
parsed_args = parser.parse_args()
pipeline_settings = None
pipeline_kwargs = None
# use this to detect running in a remote context
if task_id := os.getenv("CLEARML_TASK_ID"):
logger.info(f"ClearML remote task: {task_id}")
task = Task.get_task(task_id)
pipeline_ui_config = task.get_parameters_as_dict()
pipeline = PipelineDecorator.get_current_pipeline()
pipeline(**pipeline_ui_config)
sys.exit(0)
# when running locally, load config file and get pipeline ClearML Settings
# and pipeline function kwarg values
elif config_file_path := parsed_args.config_file_path:
raw_config = json.loads(config_file_path.read_text())
pipeline_settings = TrainModelPipelineSettings.parse_obj(raw_config["settings"])
pipeline_kwargs = TrainModelPipelineKwargs.parse_obj(raw_config["pipeline_kwargs"])
else:
pipeline_settings = TrainModelPipelineSettings()
pipeline_kwargs = TrainModelPipelineKwargs()
logger.info(
f"ClearML Configuration: {pipeline_settings if pipeline_settings else '<remote execution>'}"
)
logger.info(f"Pipeline Configuration: {pipeline_kwargs}.")
if pipeline_settings.execution_mode == ExecutionMode.all_local_single_process:
PipelineDecorator.debug_pipeline()
elif pipeline_settings.execution_mode == ExecutionMode.local_controller_subprocess_tasks:
PipelineDecorator.run_locally()
pipeline = build_pipeline(pipeline_settings)
# start pipeline with values from config
pipeline(**pipeline_kwargs.dict())
Hi @<1533620191232004096:profile|NuttyLobster9> ! PipelineDecorator.get_current_pipeline
will return a PipelineDecorator
instance (which inherits from PipelineController
) once the pipeline function has been called. So
pipeline = PipelineDecorator.get_current_pipeline()
pipeline(*args)
doesn't really make sense. You should likely call pipeline = build_pipeline(*args)
instead
Hi @<1523701435869433856:profile|SmugDolphin23> , so I need to call the pipeline function again in the remote context? I guess I thought when I start it up, my local session parses the pipeline and then transmits it to the server to run but it sounds like, it just copies the code and then i need to effectively call it again in the agent?
If the task is running remotely and the parameters are populated, then the local run parameters will not be used, instead the parameters that are already on the task will be used. This is because we want to allow users to change these parameters in the UI if they want to - so the paramters that are in the code are ignored in the favor of the ones in the UI
Gotcha. Are the parameters in @PipelineDecorator.pipeline()
ignored in the remote context? Settings like the docker image and gitlab repo would already be used before the pipeline is kicked off on the agent.
Sorry, i meant the arguments that are supplied to the decorator method, itself @PipelineDecorator.pipeline()
and @PipelineDecorator.component()
, things like name
, project
, docker_args
, etc.
Yes, you need to call the function every time. The remote run might have some parameters populated which you can use, but the pipeline function needs to be called if you actually want to run the pipeline.
The result i get in the agent is:
Traceback (most recent call last):
File "src/clearml_pipelines_examples/pipelines/examples/train_model_on_random_data/pipeline.py", line 89, in <module>
pipeline(**pipeline_ui_config)
TypeError: 'NoneType' object is not callable
Seems like the call to pipeline = PipelineDecorator.get_current_pipeline()
returns None
. Also, in the UI, I should be seeing all of the pipeline function parameters but I only see the config_file_path
Okay well I have to supply them again for the function to work, but the values are ignored so i can just have a hard-coded version for remote.
I am still struggling to figure out how to update the parameter defaults, though. I would like to be able to do the equivalent of the PipelineController.add_parameter()
so that I can supply a local config with new defaults that are used on the remote execution. Otherwise, I’m stuck with whatever defaults are in the function signature.
Are those fixed from the local environment or do i need to also supply those again in the remote context?