Just making sure, after the  pipe  object is created, you can call Task.current_task() , is that correct?
Maybe the case is that after  start  /  start_locally  the reference to the pipeline task disappears somehow? O_O
Okay so at the first part of the code, we define some kind of callback that we add to our steps, so later we can collect them and attach the results to the pipeline task. It looks something like this
` class MedianPredictionCollector:
_tasks_to_collect = list()
@classmethod
def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node):
    # Collect tasks
    cls._tasks_to_collect.append(node.executed)
@classmethod
def process_results(cls):
    # Collect median predictions
    for task_id in cls._tasks_to_collect:
         .......
    # Upload to pipeline
    clearml.Task.current_task().upload_artifact(
        'Prediction results', median_predictions) `
Then the pipeline goes like this
`     pipe = clearml.PipelineController(
name=TASK_NAME,
project=PROJECT_NAME,
version='0.0.1',
add_pipeline_tags=False,
)
median_predictions_collector = MedianPredictionCollector()
pipe.set_default_execution_queue('default')
# Add steps
for application in conf.ALL_APPS:
     ....
     pipe.add_step(name=...,
                    parents=...,
                    base_task_project=...,
                    base_task_name=...,
                    post_execute_callback=median_predictions_collector.collect_description_tables)
if not arguments.enqueue:
pipe.start_locally(run_pipeline_steps_locally=True)
else:
pipe.start(queue=arguments.enqueue)
########### THIS IS WHERE IT FAILS ##########
median_predictions_collector.process_results() `
We tried many things, like getting the reference to the  current_task()  earlier, so it was a class variable or somehting similar, but that didn't help
(I suspect you are correct, but I'm missing some information in order to understand where the problem is)
WackyRabbit7  can you send mock code that explains how you create the pipeline ?
I suspect that it has something to do with remote execution / local execution of pipelines, because we play with this , so sometimes the pipeline task itself executes on the client, and sometimes on the host (where the agent is also)
Okay so regarding the version - we are using 1.1.1
The thing with this error it that it happens sometimes, and when it happens it never goes away...
I don't know what causes it, but we have one host where it works okay, then someone else checks out the repo and tried and it fails for this error, while another guy can do the same and it will work for him
So I checked the code, and the Pipeline constructor internally calls Task.init, that means that after you constructs the pipeline object, Task.current_task() should return a valid  object....
let me know what you find out
I'll check the version tomorrow, about the current_task call, I tried before and after - same result
Hi  WackyRabbit7
So I'm assuming after the start_locally is called ?
Which clearml version are you using ?
(just making sure, calling  Task.current_task()   before starting the pipeline returns the correct Task?)
AgitatedDove14 sorry for the late reply,
It's right after executing all the steps. So we have the following block which determines whether we run locally or remotely
if not arguments.enqueue: pipe.start_locally(run_pipeline_steps_locally=True) else: pipe.start(queue=arguments.enqueue)
And right after we have a method that calls  Task.current_task()  which returns  None
Then what happens is that
Task.current_task()
returns
None
for the pipeline's task...
Hmm that sounds like the pipeline Task was closed?! could that be? where (in the code) is the call to Task.current_task ?
AgitatedDove14  just so you'd know this is a severe problem that occurs from time to time and we can't explain why it happens... Just to remind, we are using a pipeline controller task, which at the end of the last execution gathers artifacts from all the children tasks and uploads a new artifact to the pipeline's task object. Then what happens is that  Task.current_task()  returns  None  for the pipeline's task...
[Assuming the above is what you are seeing]
What I "think" is happening is that the Pipeline creates it's own Task. When the pipeline completes, it closes it's own Task, basically making any later calls to Tasl.current_task() return None, because there is no active Task. I think this is the reason that when you are calling process_results(...) you end up with None.
For a quick fix, you can dopipeline = Pipeline(...) MedianPredictionCollector.process_results(pipeline._task)Maybe we should add an argument to the Pipeline, telling it Not to close the Task when it is done?
WackyRabbit7  just making sure I understand:MedianPredictionCollector.process_results  Is called after the pipeline is completed.
Then inside the function, Task.current_task() returns None.
Is this correct?
AgitatedDove14 I really don't know how is this possible... I tried upgrading the server, tried whatever I could
About small toy code to reproduce I just don't have the time for that, but I will paste the callback I am using to this explanation. This is the overall logic so you can replicate and use my callback
From the pipeline task, launch some sub tasks, and put in their  post_execute_callback  the  .collect_description_tables  method from my callback class (attached below) Run the pipeline locally, e.g.  pipe.start_locally(run_pipeline_steps_locally=True) After (2) is done, call  .process_results()
my callback:
` class MedianPredictionCollector:
_tasks_to_collect = list()
_apps = list()
_medians = list()
_pipeline_task = clearml.Task.current_task()
@classmethod
def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node):
    # Collect tasks
    cls._tasks_to_collect.append(node.executed)
@classmethod
def process_results(cls):
    """
    Summarize all median predictions into one table and attach as artifact to the pipeline task
    :return: None
    """
    # Collect median predictions
    for task_id in cls._tasks_to_collect:
        current_task = clearml.Task.get_task(task_id)
        median_prediction = current_task.artifacts['inference_description_table'].get().loc[5]
        app = clearml.Task.get_task(task_id=current_task.get_parameter('Args/task_id')).get_parameter(
            'Args/application')
        cls._apps.append(app)
        cls._medians.append(median_prediction)
    # Summary table
    median_predictions = pd.DataFrame(index=cls._apps, data=cls._medians)
    # Upload to pipeline
    cls._pipeline_task.upload_artifact('Median Predictions', median_predictions)
    # I also tried not swapping this line with clearml.Task.current_task().upload_artifact ... didn't work `
		but now since
Task.current_task()
doesn't work on the pipeline object we have a serious problem
How is that possible ?
Is there a small toy code that can reproduce it ?
If you want we can do live zoom or something so you can see what happens
The weirdest thing, is that the execution is "completed" but it actually failed
It's kind of random, it works sometimes and sometimes it doesn't
after you create the pipeline object itself , can you get Task.current_task() ?
AgitatedDove14 no I can't... Just checked this. This is a huge problem for us, it used to work before and it just stopped working and I can't figure out why.
It's a problem for us because we made it a methodology of running some tasks under a pipeline task and saving summary iunfo to the pipeline task - but now since  Task.current_task()  doesn't work on the pipeline object we have a serious problem
This is a part of a bigger process which times quite some time and resources, I hope I can try this soon if this will help get to the bottom of this
No worries, if you have another handle on how/why/when we loose the current Task, please share 🙂
This is a part of a bigger process which times quite some time and resources, I hope I can try this soon if this will help get to the bottom of this