Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
I’M Trying To Understand The Execution Flow Of Pipelines When Translating From Local To Remote Execution. I’Ve Defined A Pipeline Using The

I’m trying to understand the execution flow of pipelines when translating from local to remote execution. I’ve defined a pipeline using the PipelineDecorator with 2 components. I’m using pydantic to define config settings for both the decorators and the pipeline function arguments, which I then load from a json config file. When running the controller locally with the task remote, everything works as expected, but when I try to run the controller remotely, I can’t figure out how to retrieve the UI parameters and then start the pipeline. When I’m doing this for a single task, the following works to retrieve the UI config:

task_id := os.getenv("CLEARML_TASK_ID"):
logger.info(f"ClearML remote task: {task_id}")
task = Task.get_task(task_id)
task_ui_config = task.get_parameters_as_dict()

But the equivalent for a pipeline doesn’t seem to work. Putting my full pipeline code into the thread:

  
  
Posted 7 months ago
Votes Newest

Answers 10


Hi @<1533620191232004096:profile|NuttyLobster9> ! PipelineDecorator.get_current_pipeline will return a PipelineDecorator instance (which inherits from PipelineController ) once the pipeline function has been called. So

pipeline = PipelineDecorator.get_current_pipeline()
pipeline(*args)

doesn't really make sense. You should likely call pipeline = build_pipeline(*args) instead

  
  
Posted 7 months ago

Yes, you need to call the function every time. The remote run might have some parameters populated which you can use, but the pipeline function needs to be called if you actually want to run the pipeline.

  
  
Posted 7 months ago

Gotcha. Are the parameters in @PipelineDecorator.pipeline() ignored in the remote context? Settings like the docker image and gitlab repo would already be used before the pipeline is kicked off on the agent.

  
  
Posted 7 months ago

The result i get in the agent is:


Traceback (most recent call last):
  File "src/clearml_pipelines_examples/pipelines/examples/train_model_on_random_data/pipeline.py", line 89, in <module>
    pipeline(**pipeline_ui_config)
TypeError: 'NoneType' object is not callable

Seems like the call to pipeline = PipelineDecorator.get_current_pipeline() returns None . Also, in the UI, I should be seeing all of the pipeline function parameters but I only see the config_file_path

  
  
Posted 7 months ago

Okay well I have to supply them again for the function to work, but the values are ignored so i can just have a hard-coded version for remote.

I am still struggling to figure out how to update the parameter defaults, though. I would like to be able to do the equivalent of the PipelineController.add_parameter() so that I can supply a local config with new defaults that are used on the remote execution. Otherwise, I’m stuck with whatever defaults are in the function signature.

  
  
Posted 7 months ago

Sorry, i meant the arguments that are supplied to the decorator method, itself @PipelineDecorator.pipeline() and @PipelineDecorator.component() , things like name , project , docker_args , etc.

  
  
Posted 7 months ago

Hi @<1523701435869433856:profile|SmugDolphin23> , so I need to call the pipeline function again in the remote context? I guess I thought when I start it up, my local session parses the pipeline and then transmits it to the server to run but it sounds like, it just copies the code and then i need to effectively call it again in the agent?

  
  
Posted 7 months ago

Are those fixed from the local environment or do i need to also supply those again in the remote context?

  
  
Posted 7 months ago

If the task is running remotely and the parameters are populated, then the local run parameters will not be used, instead the parameters that are already on the task will be used. This is because we want to allow users to change these parameters in the UI if they want to - so the paramters that are in the code are ignored in the favor of the ones in the UI

  
  
Posted 7 months ago

import json
import os
import sys
from argparse import ArgumentParser
from logging import getLogger
from pathlib import Path
from typing import Callable

from clearml import PipelineDecorator, Task

from clearml_pipelines_examples.base.pipeline_settings import ExecutionMode
from clearml_pipelines_examples.pipelines.examples.train_model_on_random_data import (
    TrainModelPipelineKwargs,
    TrainModelPipelineSettings,
)
from clearml_pipelines_examples.tasks.examples import generate_dataset, train_model

logger = getLogger()


def build_pipeline(pipeline_config: TrainModelPipelineSettings) -> Callable:

    generate_dataset_config = pipeline_config.components.generate_dataset
    generate_dataset_step = PipelineDecorator.component(
        generate_dataset,
        task_type="data_processing",
        return_values=["n_data_features"],
        **generate_dataset_config.dict(),
    )

    train_model_config = pipeline_config.components.train_model
    train_model_step = PipelineDecorator.component(
        train_model,
        task_type="training",
        parents=["generate_dataset"],
        **train_model_config.dict(),
    )

    @PipelineDecorator.pipeline(
        start_controller_locally=pipeline_config.execution_mode != ExecutionMode.all_remote,
        **pipeline_config.dict(exclude={"execution_mode", "components"}),
    )
    def pipeline(
        clearml_dataset_name: str,
        clearml_dataset_project_name: str,
        clearml_model_name: str,
        generate_n_samples: int = 100,
        generate_n_features: int = 1,
        distribution_type: str = "uniform",
        epochs: int = 1,
    ):

        n_data_features = generate_dataset_step(
            clearml_dataset_name=clearml_dataset_name,
            clearml_dataset_project_name=clearml_dataset_project_name,
            n_samples=generate_n_samples,
            n_features=generate_n_features,
            dist_type=distribution_type,
        )
        train_model_step(
            clearml_dataset_name=clearml_dataset_name,
            clearml_dataset_project_name=clearml_dataset_project_name,
            clearml_model_name=clearml_model_name,
            epochs=epochs,
            n_dataset_features=n_data_features,
        )

    return pipeline


if __name__ == "__main__":

    # mbarna/TODO: move this parsing to a separate function
    parser = ArgumentParser()
    # mbarna/TODO: would be good to have this not end up in the pipeline UI config
    parser.add_argument("--config-file-path", "-f", type=Path)
    parsed_args = parser.parse_args()
    pipeline_settings = None
    pipeline_kwargs = None

    # use this to detect running in a remote context
    if task_id := os.getenv("CLEARML_TASK_ID"):
        logger.info(f"ClearML remote task: {task_id}")
        task = Task.get_task(task_id)
        pipeline_ui_config = task.get_parameters_as_dict()
        pipeline = PipelineDecorator.get_current_pipeline()
        pipeline(**pipeline_ui_config)
        sys.exit(0)

    # when running locally, load config file and get pipeline ClearML Settings
    # and pipeline function kwarg values
    elif config_file_path := parsed_args.config_file_path:
        raw_config = json.loads(config_file_path.read_text())
        pipeline_settings = TrainModelPipelineSettings.parse_obj(raw_config["settings"])
        pipeline_kwargs = TrainModelPipelineKwargs.parse_obj(raw_config["pipeline_kwargs"])
    else:
        pipeline_settings = TrainModelPipelineSettings()
        pipeline_kwargs = TrainModelPipelineKwargs()

    logger.info(
        f"ClearML Configuration: {pipeline_settings if pipeline_settings else '<remote execution>'}"
    )
    logger.info(f"Pipeline Configuration: {pipeline_kwargs}.")

    if pipeline_settings.execution_mode == ExecutionMode.all_local_single_process:
        PipelineDecorator.debug_pipeline()
    elif pipeline_settings.execution_mode == ExecutionMode.local_controller_subprocess_tasks:
        PipelineDecorator.run_locally()

    pipeline = build_pipeline(pipeline_settings)
    # start pipeline with values from config
    pipeline(**pipeline_kwargs.dict())
  
  
Posted 7 months ago
592 Views
10 Answers
7 months ago
7 months ago
Tags
Similar posts