@<1523701205467926528:profile|AgitatedDove14> Thank you, ill try to do that !
is this code running inside the Task that is you data processing? Assuming it does check this code, it will fetch the pipeline and then the task you need
previous_task = Task.get_task(
project=Task.current_task().project,
task_name="process_dataset", #use "process_dataset" name from pipe
task_filter={'status': ['completed']})
Notice using the current Tasks project and to make sure you are looking for a component running under the same pipeline
Hi @<1523701205467926528:profile|AgitatedDove14> . I got Task.get_task to work by using the name passed in pipe. add_step but not with the task_name set in Task.init of the data_processing.py file. I want to understand if there's a better way than just passing task_name to parameter_override? If not, then can I understand why pipeline has to override task_name with the add_step name?
main.py
prefix='Args/'
pipe.add_step(
name="process_dataset",
base_task_project=project_name,
base_task_name="data_processing",
parameter_override={} #removed parameters for code clarity
)
src/data_processing/data_processing.py
task_name='data_processing'
task = Task.init(project_name=project_name,
task_name=task_name,task_type='data_processing')
#access the previous successful runs artifact
#this doesnt work when running as pipeline but works when run independently
previous_task = Task.get_task(
project_name=project_name,
task_name=task_name,
task_filter={'status': ['completed']})
#this works when running on pipeline
previous_task = Task.get_task(
project_name=project_name,
task_name="process_dataset", #use "process_dataset" name from pipe
task_filter={'status': ['completed']})
Oh I see, try the following to get a list of all pipelines, then with the selected pipeline you can locate the component:
pipeline_project = Task.current_task().project
pipelines_ids = Task.query_tasks(task_filter=dict(
project=[pipeline_project],
type=["controller"],
system_tags=["pipeline"],
order_by=["-last_change"],
search_hidden=True,)
)
# take the second to the last updated one (becuase the last one is us)
pipeline_id = pipelines_ids[1]
# get all components, or select the one that is you based on name?
components_ids = Task.query_tasks(task_filter=dict(
project=[pipeline_project], parent=pipeline_id)
)
#
I might have some typos here., but it should generally work
Hey there @<1523701205467926528:profile|AgitatedDove14> . So essentially i have a task called "data_processing" that I run in my pipeline. I just want to access old artifacts(dataframe) of my "data_processing" task inside my current "data_processing" task and append new rows to it on my current run and save the updated dataframe. This was not an issue when i run my task alone but when i run it as a pipeline it seems like its not finding old runs of the task.
Hi @<1631826770770530304:profile|GracefulHamster67>
if you want your current task:
task = Task.current_task()
if you need the pipeline Task from the pipeline component
pipeline = Task.get_task(Task.current_task().parent)
where are you trying to get the pipelines from? I'm not sure I understand the use case?
From what I understand while looking at the clearml UI is that pipelines don't exactly run under projects directly but under .pipeline
so it would look like MyProject/.pipelines/Pipeline Demo
.