` from clearml import PipelineController
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_one(pickle_data_url):
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
import sklearn # noqa
import pickle
import pandas as pd
from clearml import StorageManager
pickle_data_url =
pickle_data_url or
' '
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f:
iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
data_frame.columns += ['target']
data_frame['target'] = iris['target']
return data_frame
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_two(data_frame, test_size=0.2, random_state=42):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.model_selection import train_test_split
y = data_frame['target']
X = data_frame[(c for c in data_frame.columns if c != 'target')]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state)
return X_train, X_test, y_train, y_test
We will use the following function an independent pipeline component step
notice all package imports inside the function will be automatically logged as
required packages for the pipeline execution step
def step_three(data):
# make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa
from sklearn.linear_model import LogisticRegression
X_train, X_test, y_train, y_test = data
model = LogisticRegression(solver='liblinear', multi_class='auto')
model.fit(X_train, y_train)
return model
if name == 'main':
# create the pipeline controller
pipe = PipelineController(
project='rudolf',
name='Pipeline demo',
version='1.1',
add_pipeline_tags=False,
)
# set the default execution queue to be used (per step we can override the execution)
pipe.set_default_execution_queue('default')
# add pipeline components
pipe.add_parameter(
name='url',
description='url to pickle file',
default=' ` ` '
)
pipe.add_function_step(
name='step_one',
function=step_one,
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
function_return=['data_frame'],
cache_executed_step=True,
)
pipe.add_function_step(
name='step_two',
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_two,
function_kwargs=dict(data_frame='${step_one.data_frame}'),
function_return=['processed_data'],
cache_executed_step=True,
)
pipe.add_function_step(
name='step_three',
# parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_three,
function_kwargs=dict(data='${step_two.processed_data}'),
function_return=['model'],
cache_executed_step=True,
)
# For debugging purposes run on the pipeline on current machine
# Use run_pipeline_steps_locally=True to further execute the pipeline component Tasks as subprocesses.
# pipe.start_locally(run_pipeline_steps_locally=False)
# Start the pipeline on the services queue (remote machine, default on the clearml-server)
pipe.start()
print('pipeline completed') `