Hi @<1631826770770530304:profile|GracefulHamster67>
if you want your current task:
task = Task.current_task()
if you need the pipeline Task from the pipeline component
pipeline = Task.get_task(Task.current_task().parent)
where are you trying to get the pipelines from? I'm not sure I understand the use case?
@<1523701205467926528:profile|AgitatedDove14> Thank you, ill try to do that !
Hi @<1523701205467926528:profile|AgitatedDove14> . I got Task.get_task to work by using the name passed in pipe. add_step but not with the task_name set in Task.init of the data_processing.py file. I want to understand if there's a better way than just passing task_name to parameter_override? If not, then can I understand why pipeline has to override task_name with the add_step name?
main.py
prefix='Args/'
pipe.add_step(
name="process_dataset",
base_task_project=project_name,
base_task_name="data_processing",
parameter_override={} #removed parameters for code clarity
)
src/data_processing/data_processing.py
task_name='data_processing'
task = Task.init(project_name=project_name,
task_name=task_name,task_type='data_processing')
#access the previous successful runs artifact
#this doesnt work when running as pipeline but works when run independently
previous_task = Task.get_task(
project_name=project_name,
task_name=task_name,
task_filter={'status': ['completed']})
#this works when running on pipeline
previous_task = Task.get_task(
project_name=project_name,
task_name="process_dataset", #use "process_dataset" name from pipe
task_filter={'status': ['completed']})
From what I understand while looking at the clearml UI is that pipelines don't exactly run under projects directly but under .pipeline
so it would look like MyProject/.pipelines/Pipeline Demo
.
Oh I see, try the following to get a list of all pipelines, then with the selected pipeline you can locate the component:
pipeline_project = Task.current_task().project
pipelines_ids = Task.query_tasks(task_filter=dict(
project=[pipeline_project],
type=["controller"],
system_tags=["pipeline"],
order_by=["-last_change"],
search_hidden=True,)
)
# take the second to the last updated one (becuase the last one is us)
pipeline_id = pipelines_ids[1]
# get all components, or select the one that is you based on name?
components_ids = Task.query_tasks(task_filter=dict(
project=[pipeline_project], parent=pipeline_id)
)
#
I might have some typos here., but it should generally work
is this code running inside the Task that is you data processing? Assuming it does check this code, it will fetch the pipeline and then the task you need
previous_task = Task.get_task(
project=Task.current_task().project,
task_name="process_dataset", #use "process_dataset" name from pipe
task_filter={'status': ['completed']})
Notice using the current Tasks project and to make sure you are looking for a component running under the same pipeline
Hey there @<1523701205467926528:profile|AgitatedDove14> . So essentially i have a task called "data_processing" that I run in my pipeline. I just want to access old artifacts(dataframe) of my "data_processing" task inside my current "data_processing" task and append new rows to it on my current run and save the updated dataframe. This was not an issue when i run my task alone but when i run it as a pipeline it seems like its not finding old runs of the task.