Hi WackyRabbit7
I have a pipeline controller task, which launches 30 tasks. Semantically there are 10 applications, and I run 3 tasks for each (those 3 are sequential, so in the UI it looks like 10 lines of 3 tasks).
👍
In one of those 3 tasks that run for every app, I save a dataframe under the name "my_dataframe".
I'm assuming as an artifact:
What I want to achieve is once all tasks are over, to collect all those "my_dataframe" artifacts (10 in number), extract a single line from each, and concatenate them to a new dataframe. I want to register this new dataframe to the pipeline task. It's kind of a summary of the most important detail in the process.
I see that makes sense to me.
Notice that "monitor_arrtifact" does not change the artifact itself, so this is not a perfect match to your use case.
So the question is, how do I get a "callback" so that I collect those artifacts.
From the top of my head (an initial design, might have a bit of typos 🙂 ):
` class Collector(object):
_tasks_to_collect = []
@classmethod
def collect_me(cls, a_pipeline, a_node):
cls._tasks_to_collect.append(a_node.executed)
@classmethod
def process_results(cls, a_pipeline, a_node:
result = []
for task_id in cls._tasks_to_collect:
df = Task.get_task(task_id).artifacts['my_artifact'].get()
# do something
result.append('something here?!')
# this will return the pipeline Task
Task.current_task().upload_artifact(name='processed results', result)
Add the collector steps (i.e. the 10 Tasks
pipe.add_task(...
post_execute_callback=Collector.collect_me
)
Add to the final?! step of the pipeline (I'm assuming there is one)
pipe.add_task(...
post_execute_callback=Collector.process_results
) `
wdyt?