Unanswered
Executed From Within A Pipelinecontroller Task, What Possible Reason Does
AgitatedDove14 I really don't know how is this possible... I tried upgrading the server, tried whatever I could
About small toy code to reproduce I just don't have the time for that, but I will paste the callback I am using to this explanation. This is the overall logic so you can replicate and use my callback
From the pipeline task, launch some sub tasks, and put in their post_execute_callback
the .collect_description_tables
method from my callback class (attached below) Run the pipeline locally, e.g. pipe.start_locally(run_pipeline_steps_locally=True)
After (2) is done, call .process_results()
my callback:
` class MedianPredictionCollector:
_tasks_to_collect = list()
_apps = list()
_medians = list()
_pipeline_task = clearml.Task.current_task()
@classmethod
def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node):
# Collect tasks
cls._tasks_to_collect.append(node.executed)
@classmethod
def process_results(cls):
"""
Summarize all median predictions into one table and attach as artifact to the pipeline task
:return: None
"""
# Collect median predictions
for task_id in cls._tasks_to_collect:
current_task = clearml.Task.get_task(task_id)
median_prediction = current_task.artifacts['inference_description_table'].get().loc[5]
app = clearml.Task.get_task(task_id=current_task.get_parameter('Args/task_id')).get_parameter(
'Args/application')
cls._apps.append(app)
cls._medians.append(median_prediction)
# Summary table
median_predictions = pd.DataFrame(index=cls._apps, data=cls._medians)
# Upload to pipeline
cls._pipeline_task.upload_artifact('Median Predictions', median_predictions)
# I also tried not swapping this line with clearml.Task.current_task().upload_artifact ... didn't work `
158 Views
0
Answers
3 years ago
one year ago