Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hey Guys, Is There An E2E Working Example Of Writing A Pipeline With 2-3 Tasks? Just An Hello World. I Am The First One Who Tries To Make Clearml Pipeline To Work I Wasn'T Able To Make It:

Hey guys,

Is there an e2e working example of writing a pipeline with 2-3 tasks? Just an hello world. I am the first one who tries to make ClearML pipeline to work

I wasn't able to make it:

` from clearml import Task
from clearml.automation import PipelineController

def pre_execute_callback_example(a_pipeline, a_node, current_param_override):
# type (PipelineController, PipelineController.Node, dict) -> bool
print(
"Cloning Task id={} with parameters: {}".format(
a_node.base_task_id, current_param_override
)
)
# if we want to skip this node (and subtree of this node) we return False
# return True to continue DAG execution
return True

def post_execute_callback_example(a_pipeline, a_node):
# type (PipelineController, PipelineController.Node) -> None
print("Completed Task id={}".format(a_node.executed))
# if we need the actual executed Task: Task.get_task(task_id=a_node.executed)
return

Connecting ClearML with the current pipeline,

from here on everything is logged automatically

pipe = PipelineController(
name="Pipeline Demo Rudolf", project="rudolf", version="0.0.1", add_pipeline_tags=False
)

pipe.set_default_execution_queue("rudolf")

pipe.add_step(
name="stage_process",
base_task_project="rudolf",
base_task_name="Pipeline step 1 process dataset",
)
pipe.add_step(
name="stage_train",
parents=["stage_process"],
base_task_project="rudolf",
base_task_name="Pipeline step 2 train model",
)

for debugging purposes use local jobs

#pipe.start_locally()

Starting the pipeline (in the background)

pipe.start()

print("done") `

Traceback (most recent call last): File "clearml_ex.py", line 36, in <module> base_task_name="Pipeline step 2 process dataset", File ".conda/envs/devops/lib/python3.7/site-packages/clearml/automation/controller.py", line 410, in add_step base_task_project, base_task_name)) ValueError: Could not find base_task_project=rudolf base_task_name=Pipeline step 2 process dataset

  
  
Posted 2 years ago
Votes Newest

Answers 17


` from clearml import PipelineController

We will use the following function an independent pipeline component step

notice all package imports inside the function will be automatically logged as

required packages for the pipeline execution step

def step_one(pickle_data_url):
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
import sklearn # noqa
import pickle
import pandas as pd
from clearml import StorageManager
pickle_data_url =
pickle_data_url or
' '
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f:
iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
data_frame.columns += ['target']
data_frame['target'] = iris['target']
return data_frame

We will use the following function an independent pipeline component step

notice all package imports inside the function will be automatically logged as

required packages for the pipeline execution step

def step_two(data_frame, test_size=0.2, random_state=42):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.model_selection import train_test_split
y = data_frame['target']
X = data_frame[(c for c in data_frame.columns if c != 'target')]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state)

return X_train, X_test, y_train, y_test

We will use the following function an independent pipeline component step

notice all package imports inside the function will be automatically logged as

required packages for the pipeline execution step

def step_three(data):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.linear_model import LogisticRegression
X_train, X_test, y_train, y_test = data
model = LogisticRegression(solver='liblinear', multi_class='auto')
model.fit(X_train, y_train)
return model

if name == 'main':

# create the pipeline controller
pipe = PipelineController(
    project='rudolf',
    name='Pipeline demo',
    version='1.1',
    add_pipeline_tags=False,
)

# set the default execution queue to be used (per step we can override the execution)
pipe.set_default_execution_queue('default')

# add pipeline components
pipe.add_parameter(
    name='url',
    description='url to pickle file',
    default=' ` ` '
)
pipe.add_function_step(
    name='step_one',
    function=step_one,
    function_kwargs=dict(pickle_data_url='${pipeline.url}'),
    function_return=['data_frame'],
    cache_executed_step=True,
)
pipe.add_function_step(
    name='step_two',
    # parents=['step_one'],  # the pipeline will automatically detect the dependencies based on the kwargs inputs
    function=step_two,
    function_kwargs=dict(data_frame='${step_one.data_frame}'),
    function_return=['processed_data'],
    cache_executed_step=True,
)
pipe.add_function_step(
    name='step_three',
    # parents=['step_two'],  # the pipeline will automatically detect the dependencies based on the kwargs inputs
    function=step_three,
    function_kwargs=dict(data='${step_two.processed_data}'),
    function_return=['model'],
    cache_executed_step=True,
)

# For debugging purposes run on the pipeline on current machine
# Use run_pipeline_steps_locally=True to further execute the pipeline component Tasks as subprocesses.
# pipe.start_locally(run_pipeline_steps_locally=False)

# Start the pipeline on the services queue (remote machine, default on the clearml-server)
pipe.start()

print('pipeline completed') `
  
  
Posted 2 years ago

This is extremely helpful! I decided to go with pipeline from functions.

Everything looks great, but the tasks are pending. Am I missing some executor or something like that?

  
  
Posted 2 years ago

Missing the last piece of the puzzle I believe.

  
  
Posted 2 years ago

Agent is a process that pulls task from a queue and assigns ressources (worker) to them. In the pipeline, when not runned locally, steps are enqueued tasks

  
  
Posted 2 years ago

Still missing some info about how to make the worker actually pull the task from the queue.

  
  
Posted 2 years ago

check that your task are enqueued in the queue the agent is listening to.
from the webUI, in your step's task, check the default_queue in the configuration section.
when you fire the agent you should have a log that also specifies which queue the agentis ssigned to
finally, in the webApp, you can check the Workers & Queues section. There you could see the agent(s), the queue they are listening to, and what tasks are enqueued in what queue

  
  
Posted 2 years ago

Here you go AverageRabbit65 : https://clear.ml/docs/latest/docs/clearml_agent

  
  
Posted 2 years ago

ClearML Task: created new task id=bda6736172df497a83290b2927df28a2 ClearML results page: 2022-06-30 12:35:16,119 - clearml.Task - INFO - No repository found, storing script code instead ClearML pipeline page: 2022-06-30 12:35:25,434 - clearml.automation.controller - INFO - Node "step_two" missing parent reference, adding: {'step_one'} 2022-06-30 12:35:25,436 - clearml.automation.controller - INFO - Node "step_three" missing parent reference, adding: {'step_two'} 2022-06-30 12:35:29,448 - clearml.Task - INFO - Waiting to finish uploads 2022-06-30 12:35:37,631 - clearml.Task - INFO - Finished uploading 2022-06-30 12:35:40,416 - clearml - WARNING - Switching to remote execution, output log page 2022-06-30 12:35:40,416 - clearml - WARNING - Terminating local execution process

  
  
Posted 2 years ago

thank you, guys! I think now it works! Amazing step-by-step support! This is sublime!

for future ref., this is a summary of what I have done:

create a project on clearml webUI create a queue on clearml webUI run an agent /homes/yosefhaie/.conda/envs/devops/bin/clearml-agent daemon --create-queue --queue <queue-name> use this test script:` from clearml import PipelineController

We will use the following function an independent pipeline component step

notice all package imports inside the function will be automatically logged as

required packages for the pipeline execution step

def step_one(pickle_data_url):
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
import sklearn # noqa
import pickle
import pandas as pd
from clearml import StorageManager
pickle_data_url =
pickle_data_url or
' '
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f:
iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
data_frame.columns += ['target']
data_frame['target'] = iris['target']
return data_frame

We will use the following function an independent pipeline component step

notice all package imports inside the function will be automatically logged as

required packages for the pipeline execution step

def step_two(data_frame, test_size=0.2, random_state=42):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.model_selection import train_test_split
y = data_frame['target']
X = data_frame[(c for c in data_frame.columns if c != 'target')]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state)

return X_train, X_test, y_train, y_test

We will use the following function an independent pipeline component step

notice all package imports inside the function will be automatically logged as

required packages for the pipeline execution step

def step_three(data):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.linear_model import LogisticRegression
X_train, X_test, y_train, y_test = data
model = LogisticRegression(solver='liblinear', multi_class='auto')
model.fit(X_train, y_train)
return model

if name == 'main':

# create the pipeline controller
pipe = PipelineController(
    project='rudolf',
    name='Pipeline functions as pipelines',
    version='1.1',
    add_pipeline_tags=False,
)

# set the default execution queue to be used (per step we can override the execution)
pipe.set_default_execution_queue('rudolf')

# add pipeline components
pipe.add_parameter(
    name='url',
    description='url to pickle file',
    default=' ` ` '
)
pipe.add_function_step(
    name='step_one',
    function=step_one,
    function_kwargs=dict(pickle_data_url='${pipeline.url}'),
    function_return=['data_frame'],
    cache_executed_step=True,
)
pipe.add_function_step(
    name='step_two',
    # parents=['step_one'],  # the pipeline will automatically detect the dependencies based on the kwargs inputs
    function=step_two,
    function_kwargs=dict(data_frame='${step_one.data_frame}'),
    function_return=['processed_data'],
    cache_executed_step=False,
)
pipe.add_function_step(
    name='step_three',
    # parents=['step_two'],  # the pipeline will automatically detect the dependencies based on the kwargs inputs
    function=step_three,
    function_kwargs=dict(data='${step_two.processed_data}'),
    function_return=['model'],
    cache_executed_step=True,
)

# For debugging purposes run on the pipeline on current machine
# Use run_pipeline_steps_locally=True to further execute the pipeline component Tasks as subprocesses.

pipe.start_locally(run_pipeline_steps_locally=False)

# Start the pipeline on the services queue (remote machine, default on the clearml-server)
pipe.start(queue="rudolf")

print('pipeline completed') `
  
  
Posted 2 years ago

Take a look at https://clear.ml/docs/latest/docs/pipelines/pipelines_sdk_tasks#running-the-pipeline ;
By default pipelines are enqueued for execution by a ClearML Agent. You can explicitly change this behaviour in your code.

  
  
Posted 2 years ago

I created a worker that listens to the queue, but the worker doesn't pull the tasks (they are pending)

  
  
Posted 2 years ago

Indeed locally it does work if I run pipe.start_locally

  
  
Posted 2 years ago

SweetBadger76 thanks,

The only thing I am not certain about is. What does agent means in ClearML world? Is the queue manager or the pipelien?

  
  
Posted 2 years ago

Hi AverageRabbit65
You are using Pipeline from Task.
The steps in this case are existing clearml tasks, thus the task you specify when you add each step ( parameters base_task_project and base_task_name ) are attributes of pre existing tasks.
To make this example work, you have first to create them
project_name = 'rudolf' Task.init(project_name=project_name, task_name="Pipeline step 1 process dataset") Task.init(project_name=project_name, task_name="Pipeline step 2 train model")
You could also use Pipeline from functions or from Decorators, to base your steps on functions that are in your code

  
  
Posted 2 years ago

ran clearml-agent daemon --queue rudolf --detached

  
  
Posted 2 years ago

you are in a regular execution - i mean not a local one. So the different pipeline tasks has been enqueued. You simply need to fire an agent to pull the enqueued tasks. I would advice you to specify the queue in the steps (parameter execution_queue ).
You then fire your agent :
clearml-agent daemon --queue my_queue

  
  
Posted 2 years ago