So could you re-explain assuming my piepline object is created by pipeline = PipelineController(...)
?
BTW: the new pipeline decorator interface example is here:
https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_decorator.py
okay but still I want to take only a row of each artifact
What do you mean?
How do I get from the node to the task object?
pipeline_task = Task.get_task(task_id=Task.current_task().parent)
is it possible to access the children tasks of the pipeline from the pipeline object?
How do I get from the node to the task object?
okay but still I want to take only a row of each artifact
is this already available or only on github?
I want to collect the dataframes from teh red tasks, and display them in the pipeline task
I think I got it, I'll ping her again if it won't succeed
WackyRabbit7 , isn't this what you need?
https://clear.ml/docs/latest/docs/references/sdk/automation_controller_pipelinecontroller#get_running_nodes
If this is the case I would do:
` # Add the collector steps (i.e. the 10 Tasks
pipe.add_task(...
post_execute_callback=Collector.collect_me
)
pipe.start()
pipe.wait()
Collector.process_results(pipe) `wdyt?
So if I'm collecting from the middle ones, shouldn't the callback be attached to them?
I'm using pipe.start_locally
so I imagine I don't have to .wait()
right?
Well this will have to wait a bit... my clearml-server is causing problems
So could you re-explain assuming my piepline object is created by
pipeline = PipelineController(...)
?
pipe.add_step(name='stage_train', parents=['stage_process', ], monitor_artifact=['my_created_artifact'], base_task_project='examples', base_task_name='pipeline step 3 train model', parameter_override={'General/dataset_task_id': '${stage_process.id}'})
This will put the artifact names "my_created_artifact" from the step Task, on the main pipeline, automatically
Okay, looks interesting but actually there is no final task, this is the pipeline layout
AgitatedDove14 worked like a charm, thanks a lot!
and then how would I register the final artifact to the pipelien? AgitatedDove14 ⬆
Hi WackyRabbit7
I have a pipeline controller task, which launches 30 tasks. Semantically there are 10 applications, and I run 3 tasks for each (those 3 are sequential, so in the UI it looks like 10 lines of 3 tasks).
👍
In one of those 3 tasks that run for every app, I save a dataframe under the name "my_dataframe".
I'm assuming as an artifact:
What I want to achieve is once all tasks are over, to collect all those "my_dataframe" artifacts (10 in number), extract a single line from each, and concatenate them to a new dataframe. I want to register this new dataframe to the pipeline task. It's kind of a summary of the most important detail in the process.
I see that makes sense to me.
Notice that "monitor_arrtifact" does not change the artifact itself, so this is not a perfect match to your use case.
So the question is, how do I get a "callback" so that I collect those artifacts.
From the top of my head (an initial design, might have a bit of typos 🙂 ):
` class Collector(object):
_tasks_to_collect = []
@classmethod
def collect_me(cls, a_pipeline, a_node):
cls._tasks_to_collect.append(a_node.executed)
@classmethod
def process_results(cls, a_pipeline, a_node:
result = []
for task_id in cls._tasks_to_collect:
df = Task.get_task(task_id).artifacts['my_artifact'].get()
# do something
result.append('something here?!')
# this will return the pipeline Task
Task.current_task().upload_artifact(name='processed results', result)
Add the collector steps (i.e. the 10 Tasks
pipe.add_task(...
post_execute_callback=Collector.collect_me
)
Add to the final?! step of the pipeline (I'm assuming there is one)
pipe.add_task(...
post_execute_callback=Collector.process_results
) `
wdyt?
AgitatedDove14
So nope, this doesn't solve my case, I'll explain the full use case from the beginning.
I have a pipeline controller task, which launches 30 tasks. Semantically there are 10 applications, and I run 3 tasks for each (those 3 are sequential, so in the UI it looks like 10 lines of 3 tasks).
In one of those 3 tasks that run for every app, I save a dataframe under the name "my_dataframe".
What I want to achieve is once all tasks are over, to collect all those "my_dataframe" artifacts (10 in number), extract a single line from each, and concatenate them to a new dataframe. I want to register this new dataframe to the pipeline task. It's kind of a summary of the most important detail in the process.
Now, first of all I'm not sure how to get to all those children tasks, but that might be solveable by using monitor_artifact
- but then I wonder if they are all called the same how will I be able to extract the single row I need from them?
Then, assuming we solved that, I want to concatenate those rows and save them as a new dataframe to the pipeline task...
Hope this makes it clearer and you can tell me if this is possible, and if so, how to do it