Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi Everyone! Is There A Way I Can Get Task.Get_Task() To Work Without Using Task_Id When Running Tasks As A Pipeline ? Im Trying To Access Old Pipeline Runs/Artifacts On My Current Pipeline But

Hi everyone! Is there a way I can get Task.get_task() to work without using task_id when running tasks as a pipeline ? Im trying to access old pipeline runs/artifacts on my current pipeline but get_task returns nothing.

  
  
Posted 5 months ago
Votes Newest

Answers 7


Hi @<1631826770770530304:profile|GracefulHamster67>
if you want your current task:

task = Task.current_task()

if you need the pipeline Task from the pipeline component

pipeline = Task.get_task(Task.current_task().parent)

where are you trying to get the pipelines from? I'm not sure I understand the use case?

  
  
Posted 5 months ago

From what I understand while looking at the clearml UI is that pipelines don't exactly run under projects directly but under .pipeline so it would look like MyProject/.pipelines/Pipeline Demo .

  
  
Posted 5 months ago

Hey there @<1523701205467926528:profile|AgitatedDove14> . So essentially i have a task called "data_processing" that I run in my pipeline. I just want to access old artifacts(dataframe) of my "data_processing" task inside my current "data_processing" task and append new rows to it on my current run and save the updated dataframe. This was not an issue when i run my task alone but when i run it as a pipeline it seems like its not finding old runs of the task.

  
  
Posted 5 months ago

Oh I see, try the following to get a list of all pipelines, then with the selected pipeline you can locate the component:

pipeline_project = Task.current_task().project

pipelines_ids = Task.query_tasks(task_filter=dict(
                project=[pipeline_project],
                type=["controller"],
                system_tags=["pipeline"],
                order_by=["-last_change"],
                search_hidden=True,)
            )
# take the second to the last updated one (becuase the last one is us)
pipeline_id = pipelines_ids[1]

# get all components, or select the one that is you based on name?
components_ids = Task.query_tasks(task_filter=dict(
                project=[pipeline_project], parent=pipeline_id)
)

# 

I might have some typos here., but it should generally work

  
  
Posted 5 months ago

is this code running inside the Task that is you data processing? Assuming it does check this code, it will fetch the pipeline and then the task you need

previous_task = Task.get_task(
        project=Task.current_task().project,
        task_name="process_dataset", #use "process_dataset" name from pipe
        task_filter={'status': ['completed']})

Notice using the current Tasks project and to make sure you are looking for a component running under the same pipeline

  
  
Posted 5 months ago

@<1523701205467926528:profile|AgitatedDove14> Thank you, ill try to do that !

  
  
Posted 5 months ago

Hi @<1523701205467926528:profile|AgitatedDove14> . I got Task.get_task to work by using the name passed in pipe. add_step but not with the task_name set in Task.init of the data_processing.py file. I want to understand if there's a better way than just passing task_name to parameter_override? If not, then can I understand why pipeline has to override task_name with the add_step name?

main.py

prefix='Args/'
pipe.add_step(
    name="process_dataset",
    base_task_project=project_name,
    base_task_name="data_processing",
    parameter_override={} #removed parameters for code clarity
)

src/data_processing/data_processing.py

task_name='data_processing'
task = Task.init(project_name=project_name, 
task_name=task_name,task_type='data_processing')

#access the previous successful runs artifact
#this doesnt work when running as pipeline but works when run independently
previous_task = Task.get_task(
        project_name=project_name,
        task_name=task_name,
        task_filter={'status': ['completed']})
#this works when running on pipeline
previous_task = Task.get_task(
        project_name=project_name,
        task_name="process_dataset", #use "process_dataset" name from pipe
        task_filter={'status': ['completed']})
  
  
Posted 5 months ago
395 Views
7 Answers
5 months ago
5 months ago
Tags