[Assuming the above is what you are seeing]
What I "think" is happening is that the Pipeline creates it's own Task. When the pipeline completes, it closes it's own Task, basically making any later calls to Tasl.current_task() return None, because there is no active Task. I think this is the reason that when you are calling process_results(...) you end up with None.
For a quick fix, you can do
pipeline = Pipeline(...) MedianPredictionCollector.process_results(pipeline._task)Maybe we should add an argument to the Pipeline, telling it Not to close the Task when it is done?
This is a part of a bigger process which times quite some time and resources, I hope I can try this soon if this will help get to the bottom of this
No worries, if you have another handle on how/why/when we loose the current Task, please share 🙂
after you create the pipeline object itself , can you get Task.current_task() ?
AgitatedDove14 no I can't... Just checked this. This is a huge problem for us, it used to work before and it just stopped working and I can't figure out why.
It's a problem for us because we made it a methodology of running some tasks under a pipeline task and saving summary iunfo to the pipeline task - but now since
Task.current_task() doesn't work on the pipeline object we have a serious problem
Okay so regarding the version - we are using 1.1.1
The thing with this error it that it happens sometimes, and when it happens it never goes away...
I don't know what causes it, but we have one host where it works okay, then someone else checks out the repo and tried and it fails for this error, while another guy can do the same and it will work for him
Okay so at the first part of the code, we define some kind of callback that we add to our steps, so later we can collect them and attach the results to the pipeline task. It looks something like this
` class MedianPredictionCollector:
_tasks_to_collect = list()
@classmethod def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node): # Collect tasks cls._tasks_to_collect.append(node.executed) @classmethod def process_results(cls): # Collect median predictions for task_id in cls._tasks_to_collect: ....... # Upload to pipeline clearml.Task.current_task().upload_artifact( 'Prediction results', median_predictions) `
Then the pipeline goes like this
` pipe = clearml.PipelineController(
median_predictions_collector = MedianPredictionCollector()
# Add steps for application in conf.ALL_APPS: .... pipe.add_step(name=..., parents=..., base_task_project=..., base_task_name=..., post_execute_callback=median_predictions_collector.collect_description_tables)
if not arguments.enqueue:
########### THIS IS WHERE IT FAILS ########## median_predictions_collector.process_results() `
We tried many things, like getting the reference to the
current_task() earlier, so it was a class variable or somehting similar, but that didn't help
AgitatedDove14 just so you'd know this is a severe problem that occurs from time to time and we can't explain why it happens... Just to remind, we are using a pipeline controller task, which at the end of the last execution gathers artifacts from all the children tasks and uploads a new artifact to the pipeline's task object. Then what happens is that
None for the pipeline's task...
AgitatedDove14 I really don't know how is this possible... I tried upgrading the server, tried whatever I could
About small toy code to reproduce I just don't have the time for that, but I will paste the callback I am using to this explanation. This is the overall logic so you can replicate and use my callback
From the pipeline task, launch some sub tasks, and put in their
.collect_description_tables method from my callback class (attached below) Run the pipeline locally, e.g.
pipe.start_locally(run_pipeline_steps_locally=True) After (2) is done, call
` class MedianPredictionCollector:
_tasks_to_collect = list() _apps = list() _medians = list() _pipeline_task = clearml.Task.current_task() @classmethod def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node): # Collect tasks cls._tasks_to_collect.append(node.executed) @classmethod def process_results(cls): """ Summarize all median predictions into one table and attach as artifact to the pipeline task :return: None """ # Collect median predictions for task_id in cls._tasks_to_collect: current_task = clearml.Task.get_task(task_id) median_prediction = current_task.artifacts['inference_description_table'].get().loc app = clearml.Task.get_task(task_id=current_task.get_parameter('Args/task_id')).get_parameter( 'Args/application') cls._apps.append(app) cls._medians.append(median_prediction) # Summary table median_predictions = pd.DataFrame(index=cls._apps, data=cls._medians) # Upload to pipeline cls._pipeline_task.upload_artifact('Median Predictions', median_predictions) # I also tried not swapping this line with clearml.Task.current_task().upload_artifact ... didn't work `
AgitatedDove14 sorry for the late reply,
It's right after executing all the steps. So we have the following block which determines whether we run locally or remotely
if not arguments.enqueue: pipe.start_locally(run_pipeline_steps_locally=True) else: pipe.start(queue=arguments.enqueue)
And right after we have a method that calls
Task.current_task() which returns