Okay so at the first part of the code, we define some kind of callback that we add to our steps, so later we can collect them and attach the results to the pipeline task. It looks something like this
` class MedianPredictionCollector:
_tasks_to_collect = list()
@classmethod
def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node):
# Collect tasks
cls._tasks_to_collect.append(node.executed)
@classmethod
def process_results(cls):
# Collect median predictions
for task_id in cls._tasks_to_collect:
.......
# Upload to pipeline
clearml.Task.current_task().upload_artifact(
'Prediction results', median_predictions) `
Then the pipeline goes like this
` pipe = clearml.PipelineController(
name=TASK_NAME,
project=PROJECT_NAME,
version='0.0.1',
add_pipeline_tags=False,
)
median_predictions_collector = MedianPredictionCollector()
pipe.set_default_execution_queue('default')
# Add steps
for application in conf.ALL_APPS:
....
pipe.add_step(name=...,
parents=...,
base_task_project=...,
base_task_name=...,
post_execute_callback=median_predictions_collector.collect_description_tables)
if not arguments.enqueue:
pipe.start_locally(run_pipeline_steps_locally=True)
else:
pipe.start(queue=arguments.enqueue)
########### THIS IS WHERE IT FAILS ##########
median_predictions_collector.process_results() `
We tried many things, like getting the reference to the current_task()
earlier, so it was a class variable or somehting similar, but that didn't help