` from clearml import PipelineController
pipe = PipelineController(name="clearmlsample_pipeline",
project="clearmlsample",
version="1.0.0")
pipe.add_parameter('seed', 2222, description='random seed to standardize randomness')
pipe.add_parameter('n_trials', 10, description='trials to run during optimization')
pipe.add_step(
name='get_data', # can be named anything
# connect pipeline to task (obtain data from Task.init in python files)
base_task_project='clearmlsample', # project name
base_task_name='get data & preprocess' # task name
)
pipe.add_step(
name='train_and_tune', # name
base_task_project='clearmlsample', # project name
base_task_name='training and tuning', # task name
# connect it to previous task using name not base_task_name
# step cannot run unless parents finish
parents=['get_data'],
# use 'General/parameter' to override parameter from py file
# give value or take value from pipe.add_parameter using '${pipeline.parameter}'
#
# don need any special command to get data from Dataset.create() from previous task
# just use the Dataset.get to get latest version
# maybe easier to store everything except (raw data) in artificats instead
parameter_override={'General/seed': '${pipeline.seed}',
'General/n_trials': '${pipeline.n_trials}'}
)
pipe.add_step(
name='evaluate', # name
base_task_project='clearmlsample', # project name
base_task_name='evaluating', # task name
# connect it to previous task use name not base_task_name
parents=['train_and_tune'],
parameter_override={'General/seed': '${pipeline.seed}',
# get task id from previous step to get models
'General/train_task_id': '${train_and_tune.id}'}
)
select default worker to run steps
pipe.set_default_execution_queue('MonashPC')
start the pipeline logic
run this to run EVERYTHING locally
pipe.start_locally(run_pipeline_steps_locally=True)
run this to run logic locally but steps remotely
pipe.start_locally()
run this to run logic remotely
do not start pipeline with same worker as set_default_exceution_queue
e.g. pipe.start(queue='MonashPC')
it will cause the steps to be queued forever because it is occuped by the pipeline logic
pipe.start(queue= 'queue')
print('done') `