AverageRabbit65 Adding to SweetBadger76 's reference, e2e examples are available for the different pipeline implementation methods:
https://clear.ml/docs/latest/docs/guides/pipeline/pipeline_controller
https://clear.ml/docs/latest/docs/guides/pipeline/pipeline_decorator
https://clear.ml/docs/latest/docs/guides/pipeline/pipeline_functions
` from clearml import PipelineController
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_one(pickle_data_url):
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
import sklearn # noqa
import pickle
import pandas as pd
from clearml import StorageManager
pickle_data_url =
pickle_data_url or
' '
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f:
iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
data_frame.columns += ['target']
data_frame['target'] = iris['target']
return data_frame
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_two(data_frame, test_size=0.2, random_state=42):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.model_selection import train_test_split
y = data_frame['target']
X = data_frame[(c for c in data_frame.columns if c != 'target')]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state)
return X_train, X_test, y_train, y_test
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_three(data):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.linear_model import LogisticRegression
X_train, X_test, y_train, y_test = data
model = LogisticRegression(solver='liblinear', multi_class='auto')
model.fit(X_train, y_train)
return model
if name == 'main':
# create the pipeline controller
pipe = PipelineController(
project='rudolf',
name='Pipeline demo',
version='1.1',
add_pipeline_tags=False,
)
# set the default execution queue to be used (per step we can override the execution)
pipe.set_default_execution_queue('default')
# add pipeline components
pipe.add_parameter(
name='url',
description='url to pickle file',
default=' ` ` '
)
pipe.add_function_step(
name='step_one',
function=step_one,
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
function_return=['data_frame'],
cache_executed_step=True,
)
pipe.add_function_step(
name='step_two',
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_two,
function_kwargs=dict(data_frame='${step_one.data_frame}'),
function_return=['processed_data'],
cache_executed_step=True,
)
pipe.add_function_step(
name='step_three',
# parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_three,
function_kwargs=dict(data='${step_two.processed_data}'),
function_return=['model'],
cache_executed_step=True,
)
# For debugging purposes run on the pipeline on current machine
# Use run_pipeline_steps_locally=True to further execute the pipeline component Tasks as subprocesses.
# pipe.start_locally(run_pipeline_steps_locally=False)
# Start the pipeline on the services queue (remote machine, default on the clearml-server)
pipe.start()
print('pipeline completed') `
ClearML Task: created new task id=bda6736172df497a83290b2927df28a2 ClearML results page:
2022-06-30 12:35:16,119 - clearml.Task - INFO - No repository found, storing script code instead ClearML pipeline page:
2022-06-30 12:35:25,434 - clearml.automation.controller - INFO - Node "step_two" missing parent reference, adding: {'step_one'} 2022-06-30 12:35:25,436 - clearml.automation.controller - INFO - Node "step_three" missing parent reference, adding: {'step_two'} 2022-06-30 12:35:29,448 - clearml.Task - INFO - Waiting to finish uploads 2022-06-30 12:35:37,631 - clearml.Task - INFO - Finished uploading 2022-06-30 12:35:40,416 - clearml - WARNING - Switching to remote execution, output log page
2022-06-30 12:35:40,416 - clearml - WARNING - Terminating local execution process
Indeed locally it does work if I run pipe.start_locally
Hi AverageRabbit65
You are using Pipeline from Task.
The steps in this case are existing clearml tasks, thus the task you specify when you add each step ( parameters base_task_project and base_task_name ) are attributes of pre existing tasks.
To make this example work, you have first to create themproject_name = 'rudolf' Task.init(project_name=project_name, task_name="Pipeline step 1 process dataset") Task.init(project_name=project_name, task_name="Pipeline step 2 train model")
You could also use Pipeline from functions or from Decorators, to base your steps on functions that are in your code
Missing the last piece of the puzzle I believe.
Still missing some info about how to make the worker actually pull the task from the queue.
SweetBadger76 thanks,
The only thing I am not certain about is. What does agent means in ClearML world? Is the queue manager or the pipelien?
ran clearml-agent daemon --queue rudolf --detached
you are in a regular execution - i mean not a local one. So the different pipeline tasks has been enqueued. You simply need to fire an agent to pull the enqueued tasks. I would advice you to specify the queue in the steps (parameter execution_queue ).
You then fire your agent :
clearml-agent daemon --queue my_queue
check that your task are enqueued in the queue the agent is listening to.
from the webUI, in your step's task, check the default_queue in the configuration section.
when you fire the agent you should have a log that also specifies which queue the agentis ssigned to
finally, in the webApp, you can check the Workers & Queues section. There you could see the agent(s), the queue they are listening to, and what tasks are enqueued in what queue
thank you, guys! I think now it works! Amazing step-by-step support! This is sublime!
for future ref., this is a summary of what I have done:
create a project on clearml webUI create a queue on clearml webUI run an agent /homes/yosefhaie/.conda/envs/devops/bin/clearml-agent daemon --create-queue --queue <queue-name>
use this test script:` from clearml import PipelineController
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_one(pickle_data_url):
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
import sklearn # noqa
import pickle
import pandas as pd
from clearml import StorageManager
pickle_data_url =
pickle_data_url or
' '
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f:
iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
data_frame.columns += ['target']
data_frame['target'] = iris['target']
return data_frame
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_two(data_frame, test_size=0.2, random_state=42):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.model_selection import train_test_split
y = data_frame['target']
X = data_frame[(c for c in data_frame.columns if c != 'target')]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state)
return X_train, X_test, y_train, y_test
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_three(data):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.linear_model import LogisticRegression
X_train, X_test, y_train, y_test = data
model = LogisticRegression(solver='liblinear', multi_class='auto')
model.fit(X_train, y_train)
return model
if name == 'main':
# create the pipeline controller
pipe = PipelineController(
project='rudolf',
name='Pipeline functions as pipelines',
version='1.1',
add_pipeline_tags=False,
)
# set the default execution queue to be used (per step we can override the execution)
pipe.set_default_execution_queue('rudolf')
# add pipeline components
pipe.add_parameter(
name='url',
description='url to pickle file',
default=' ` ` '
)
pipe.add_function_step(
name='step_one',
function=step_one,
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
function_return=['data_frame'],
cache_executed_step=True,
)
pipe.add_function_step(
name='step_two',
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_two,
function_kwargs=dict(data_frame='${step_one.data_frame}'),
function_return=['processed_data'],
cache_executed_step=False,
)
pipe.add_function_step(
name='step_three',
# parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_three,
function_kwargs=dict(data='${step_two.processed_data}'),
function_return=['model'],
cache_executed_step=True,
)
# For debugging purposes run on the pipeline on current machine
# Use run_pipeline_steps_locally=True to further execute the pipeline component Tasks as subprocesses.
pipe.start_locally(run_pipeline_steps_locally=False)
# Start the pipeline on the services queue (remote machine, default on the clearml-server)
pipe.start(queue="rudolf")
print('pipeline completed') `
I created a worker that listens to the queue, but the worker doesn't pull the tasks (they are pending)
Here you go AverageRabbit65 : https://clear.ml/docs/latest/docs/clearml_agent
Agent is a process that pulls task from a queue and assigns ressources (worker) to them. In the pipeline, when not runned locally, steps are enqueued tasks
This is extremely helpful! I decided to go with pipeline from functions.
Everything looks great, but the tasks are pending. Am I missing some executor or something like that?
Take a look at https://clear.ml/docs/latest/docs/pipelines/pipelines_sdk_tasks#running-the-pipeline ;
By default pipelines are enqueued for execution by a ClearML Agent. You can explicitly change this behaviour in your code.