Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Executed From Within A Pipelinecontroller Task, What Possible Reason Does

Executed from within a PipelineController task, what possible reason does current_task() have to return None ?

2021-11-07T11:32:43.7366887Z clearml.Task.current_task().upload_artifact('Median Predictions', median_predictions) 2021-11-07T11:32:43.7369591Z AttributeError: 'NoneType' object has no attribute 'upload_artifact'

  
  
Posted 3 years ago
Votes Newest

Answers 30


Maybe the case is that after start / start_locally the reference to the pipeline task disappears somehow? O_O

  
  
Posted 2 years ago

Then what happens is that 

Task.current_task()

 returns 

None

 for the pipeline's task...

Hmm that sounds like the pipeline Task was closed?! could that be? where (in the code) is the call to Task.current_task ?

  
  
Posted 2 years ago

EagerOctopus10

  
  
Posted 2 years ago

Okay so at the first part of the code, we define some kind of callback that we add to our steps, so later we can collect them and attach the results to the pipeline task. It looks something like this

` class MedianPredictionCollector:
_tasks_to_collect = list()

@classmethod
def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node):

    # Collect tasks
    cls._tasks_to_collect.append(node.executed)

@classmethod
def process_results(cls):


    # Collect median predictions
    for task_id in cls._tasks_to_collect:
         .......

    # Upload to pipeline
    clearml.Task.current_task().upload_artifact(
        'Prediction results', median_predictions) `

Then the pipeline goes like this
` pipe = clearml.PipelineController(
name=TASK_NAME,
project=PROJECT_NAME,
version='0.0.1',
add_pipeline_tags=False,
)
median_predictions_collector = MedianPredictionCollector()
pipe.set_default_execution_queue('default')

# Add steps
for application in conf.ALL_APPS:
     ....
     pipe.add_step(name=...,
                    parents=...,
                    base_task_project=...,
                    base_task_name=...,
                    post_execute_callback=median_predictions_collector.collect_description_tables)

if not arguments.enqueue:
pipe.start_locally(run_pipeline_steps_locally=True)
else:
pipe.start(queue=arguments.enqueue)

########### THIS IS WHERE IT FAILS ##########
median_predictions_collector.process_results() `

We tried many things, like getting the reference to the current_task() earlier, so it was a class variable or somehting similar, but that didn't help

  
  
Posted 2 years ago

WackyRabbit7 just making sure I understand:
MedianPredictionCollector.process_results Is called after the pipeline is completed.
Then inside the function, Task.current_task() returns None.
Is this correct?

  
  
Posted 2 years ago

AgitatedDove14 I really don't know how is this possible... I tried upgrading the server, tried whatever I could

About small toy code to reproduce I just don't have the time for that, but I will paste the callback I am using to this explanation. This is the overall logic so you can replicate and use my callback

From the pipeline task, launch some sub tasks, and put in their post_execute_callback the .collect_description_tables method from my callback class (attached below) Run the pipeline locally, e.g. pipe.start_locally(run_pipeline_steps_locally=True) After (2) is done, call .process_results()
my callback:

` class MedianPredictionCollector:

_tasks_to_collect = list()
_apps = list()
_medians = list()
_pipeline_task = clearml.Task.current_task()

@classmethod
def collect_description_tables(cls, pipeline: clearml.PipelineController, node: clearml.PipelineController.Node):
    # Collect tasks
    cls._tasks_to_collect.append(node.executed)

@classmethod
def process_results(cls):
    """
    Summarize all median predictions into one table and attach as artifact to the pipeline task

    :return: None
    """

    # Collect median predictions
    for task_id in cls._tasks_to_collect:
        current_task = clearml.Task.get_task(task_id)
        median_prediction = current_task.artifacts['inference_description_table'].get().loc[5]
        app = clearml.Task.get_task(task_id=current_task.get_parameter('Args/task_id')).get_parameter(
            'Args/application')

        cls._apps.append(app)
        cls._medians.append(median_prediction)

    # Summary table
    median_predictions = pd.DataFrame(index=cls._apps, data=cls._medians)

    # Upload to pipeline
    cls._pipeline_task.upload_artifact('Median Predictions', median_predictions)
    # I also tried not swapping this line with clearml.Task.current_task().upload_artifact ... didn't work `
  
  
Posted 2 years ago

Okay so regarding the version - we are using 1.1.1

The thing with this error it that it happens sometimes, and when it happens it never goes away...
I don't know what causes it, but we have one host where it works okay, then someone else checks out the repo and tried and it fails for this error, while another guy can do the same and it will work for him

  
  
Posted 2 years ago

Hi WackyRabbit7
So I'm assuming after the start_locally is called ?
Which clearml version are you using ?
(just making sure, calling Task.current_task() before starting the pipeline returns the correct Task?)

  
  
Posted 2 years ago

Just making sure, after the pipe object is created, you can call Task.current_task() , is that correct?

  
  
Posted 2 years ago

after you create the pipeline object itself , can you get Task.current_task() ?

AgitatedDove14 no I can't... Just checked this. This is a huge problem for us, it used to work before and it just stopped working and I can't figure out why.

It's a problem for us because we made it a methodology of running some tasks under a pipeline task and saving summary iunfo to the pipeline task - but now since Task.current_task() doesn't work on the pipeline object we have a serious problem

  
  
Posted 2 years ago

ShinyLobster84

  
  
Posted 2 years ago

This is a part of a bigger process which times quite some time and resources, I hope I can try this soon if this will help get to the bottom of this

  
  
Posted 3 years ago

I'll check if this works tomorrow

  
  
Posted 2 years ago

The weirdest thing, is that the execution is "completed" but it actually failed

  
  
Posted 2 years ago

but I don't really know

  
  
Posted 2 years ago

[Assuming the above is what you are seeing]
What I "think" is happening is that the Pipeline creates it's own Task. When the pipeline completes, it closes it's own Task, basically making any later calls to Tasl.current_task() return None, because there is no active Task. I think this is the reason that when you are calling process_results(...) you end up with None.
For a quick fix, you can do
pipeline = Pipeline(...) MedianPredictionCollector.process_results(pipeline._task)Maybe we should add an argument to the Pipeline, telling it Not to close the Task when it is done?

  
  
Posted 2 years ago

(I suspect you are correct, but I'm missing some information in order to understand where the problem is)
WackyRabbit7 can you send mock code that explains how you create the pipeline ?

  
  
Posted 2 years ago

I'll check the version tomorrow, about the current_task call, I tried before and after - same result

  
  
Posted 2 years ago

If you want we can do live zoom or something so you can see what happens

  
  
Posted 2 years ago

checking and will let you know

  
  
Posted 2 years ago

AgitatedDove14 sorry for the late reply,

It's right after executing all the steps. So we have the following block which determines whether we run locally or remotely

if not arguments.enqueue: pipe.start_locally(run_pipeline_steps_locally=True) else: pipe.start(queue=arguments.enqueue)
And right after we have a method that calls Task.current_task() which returns None

  
  
Posted 2 years ago

So I checked the code, and the Pipeline constructor internally calls Task.init, that means that after you constructs the pipeline object, Task.current_task() should return a valid object....
let me know what you find out

  
  
Posted 2 years ago

I suspect that it has something to do with remote execution / local execution of pipelines, because we play with this , so sometimes the pipeline task itself executes on the client, and sometimes on the host (where the agent is also)

  
  
Posted 2 years ago

but can't right now

  
  
Posted 3 years ago

This is a part of a bigger process which times quite some time and resources, I hope I can try this soon if this will help get to the bottom of this

No worries, if you have another handle on how/why/when we loose the current Task, please share 🙂

  
  
Posted 3 years ago

AgitatedDove14

  
  
Posted 2 years ago

It's kind of random, it works sometimes and sometimes it doesn't

  
  
Posted 2 years ago

Yes, I'll prepare something and send

  
  
Posted 2 years ago

AgitatedDove14 just so you'd know this is a severe problem that occurs from time to time and we can't explain why it happens... Just to remind, we are using a pipeline controller task, which at the end of the last execution gathers artifacts from all the children tasks and uploads a new artifact to the pipeline's task object. Then what happens is that Task.current_task() returns None for the pipeline's task...

  
  
Posted 2 years ago

but now since 

Task.current_task()

 doesn't work on the pipeline object we have a serious problem

How is that possible ?
Is there a small toy code that can reproduce it ?

  
  
Posted 2 years ago
981 Views
30 Answers
3 years ago
one year ago
Tags