Then what happens is that
Task.current_task()
returns
None
for the pipeline's task...
Hmm that sounds like the pipeline Task was closed?! could that be? where (in the code) is the call to Task.current_task ?
AgitatedDove14 I really don't know how is this possible... I tried upgrading the server, tried whatever I could
About small toy code to reproduce I just don't have the time for that, but I will paste the callback I am using to this explanation. This is the overall logic so you can replicate and use my callback
From the pipeline task, launch some sub tasks, and put in their post_execute_callback
the .collect_description_tables
method from my callback class (attached below) Run the pipeline locally, e.g. pipe.start_locally(run_pipeline_steps_locally=True)
After (2) is done, call .process_results()
my callback:
` class MedianPredictionCollector:
_tasks_to_collect = list()
_apps = list()
_medians = list()
_pipeline_task = clearml.Task.current_task()
@classmethod
def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node):
# Collect tasks
cls._tasks_to_collect.append(node.executed)
@classmethod
def process_results(cls):
"""
Summarize all median predictions into one table and attach as artifact to the pipeline task
:return: None
"""
# Collect median predictions
for task_id in cls._tasks_to_collect:
current_task = clearml.Task.get_task(task_id)
median_prediction = current_task.artifacts['inference_description_table'].get().loc[5]
app = clearml.Task.get_task(task_id=current_task.get_parameter('Args/task_id')).get_parameter(
'Args/application')
cls._apps.append(app)
cls._medians.append(median_prediction)
# Summary table
median_predictions = pd.DataFrame(index=cls._apps, data=cls._medians)
# Upload to pipeline
cls._pipeline_task.upload_artifact('Median Predictions', median_predictions)
# I also tried not swapping this line with clearml.Task.current_task().upload_artifact ... didn't work `
Okay so regarding the version - we are using 1.1.1
The thing with this error it that it happens sometimes, and when it happens it never goes away...
I don't know what causes it, but we have one host where it works okay, then someone else checks out the repo and tried and it fails for this error, while another guy can do the same and it will work for him
Hi WackyRabbit7
So I'm assuming after the start_locally
is called ?
Which clearml version are you using ?
(just making sure, calling Task.current_task()
before starting the pipeline returns the correct Task?)
Just making sure, after the pipe
object is created, you can call Task.current_task() , is that correct?
after you create the pipeline object itself , can you get Task.current_task() ?
AgitatedDove14 no I can't... Just checked this. This is a huge problem for us, it used to work before and it just stopped working and I can't figure out why.
It's a problem for us because we made it a methodology of running some tasks under a pipeline task and saving summary iunfo to the pipeline task - but now since Task.current_task()
doesn't work on the pipeline object we have a serious problem
This is a part of a bigger process which times quite some time and resources, I hope I can try this soon if this will help get to the bottom of this
The weirdest thing, is that the execution is "completed" but it actually failed
[Assuming the above is what you are seeing]
What I "think" is happening is that the Pipeline creates it's own Task. When the pipeline completes, it closes it's own Task, basically making any later calls to Tasl.current_task() return None, because there is no active Task. I think this is the reason that when you are calling process_results(...) you end up with None.
For a quick fix, you can dopipeline = Pipeline(...) MedianPredictionCollector.process_results(pipeline._task)
Maybe we should add an argument to the Pipeline, telling it Not to close the Task when it is done?
(I suspect you are correct, but I'm missing some information in order to understand where the problem is)
WackyRabbit7 can you send mock code that explains how you create the pipeline ?
I'll check the version tomorrow, about the current_task call, I tried before and after - same result
If you want we can do live zoom or something so you can see what happens
AgitatedDove14 sorry for the late reply,
It's right after executing all the steps. So we have the following block which determines whether we run locally or remotely
if not arguments.enqueue: pipe.start_locally(run_pipeline_steps_locally=True) else: pipe.start(queue=arguments.enqueue)
And right after we have a method that calls Task.current_task()
which returns None
So I checked the code, and the Pipeline constructor internally calls Task.init, that means that after you constructs the pipeline object, Task.current_task() should return a valid object....
let me know what you find out
I suspect that it has something to do with remote execution / local execution of pipelines, because we play with this , so sometimes the pipeline task itself executes on the client, and sometimes on the host (where the agent is also)
This is a part of a bigger process which times quite some time and resources, I hope I can try this soon if this will help get to the bottom of this
No worries, if you have another handle on how/why/when we loose the current Task, please share 🙂
It's kind of random, it works sometimes and sometimes it doesn't
AgitatedDove14 just so you'd know this is a severe problem that occurs from time to time and we can't explain why it happens... Just to remind, we are using a pipeline controller task, which at the end of the last execution gathers artifacts from all the children tasks and uploads a new artifact to the pipeline's task object. Then what happens is that Task.current_task()
returns None
for the pipeline's task...
but now since
Task.current_task()
doesn't work on the pipeline object we have a serious problem
How is that possible ?
Is there a small toy code that can reproduce it ?
Maybe the case is that after start
/ start_locally
the reference to the pipeline task disappears somehow? O_O
Okay so at the first part of the code, we define some kind of callback that we add to our steps, so later we can collect them and attach the results to the pipeline task. It looks something like this
` class MedianPredictionCollector:
_tasks_to_collect = list()
@classmethod
def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node):
# Collect tasks
cls._tasks_to_collect.append(node.executed)
@classmethod
def process_results(cls):
# Collect median predictions
for task_id in cls._tasks_to_collect:
.......
# Upload to pipeline
clearml.Task.current_task().upload_artifact(
'Prediction results', median_predictions) `
Then the pipeline goes like this
` pipe = clearml.PipelineController(
name=TASK_NAME,
project=PROJECT_NAME,
version='0.0.1',
add_pipeline_tags=False,
)
median_predictions_collector = MedianPredictionCollector()
pipe.set_default_execution_queue('default')
# Add steps
for application in conf.ALL_APPS:
....
pipe.add_step(name=...,
parents=...,
base_task_project=...,
base_task_name=...,
post_execute_callback=median_predictions_collector.collect_description_tables)
if not arguments.enqueue:
pipe.start_locally(run_pipeline_steps_locally=True)
else:
pipe.start(queue=arguments.enqueue)
########### THIS IS WHERE IT FAILS ##########
median_predictions_collector.process_results() `
We tried many things, like getting the reference to the current_task()
earlier, so it was a class variable or somehting similar, but that didn't help
WackyRabbit7 just making sure I understand:MedianPredictionCollector.process_results
Is called after the pipeline is completed.
Then inside the function, Task.current_task() returns None.
Is this correct?