Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Unanswered
In Pipelinev2, Is It Possible To Register Artifacts To The Pipeline Task? I See There Is A Private Variable


Hi WackyRabbit7

I have a pipeline controller task, which launches 30 tasks. Semantically there are 10 applications, and I run 3 tasks for each (those 3 are sequential, so in the UI it looks like 10 lines of 3 tasks).

👍

In one of those 3 tasks that run for every app, I save a dataframe under the name "my_dataframe".

I'm assuming as an artifact:

What I want to achieve is once all tasks are over, to collect all those "my_dataframe" artifacts (10 in number), extract a single line from each, and concatenate them to a new dataframe. I want to register this new dataframe to the pipeline task. It's kind of a summary of the most important detail in the process.

I see that makes sense to me.
Notice that "monitor_arrtifact" does not change the artifact itself, so this is not a perfect match to your use case.
So the question is, how do I get a "callback" so that I collect those artifacts.

From the top of my head (an initial design, might have a bit of typos 🙂 ):
` class Collector(object):
_tasks_to_collect = []

@classmethod
def collect_me(cls, a_pipeline, a_node):
cls._tasks_to_collect.append(a_node.executed)

@classmethod
def process_results(cls, a_pipeline, a_node:
result = []
for task_id in cls._tasks_to_collect:
df = Task.get_task(task_id).artifacts['my_artifact'].get()
# do something
result.append('something here?!')
# this will return the pipeline Task
Task.current_task().upload_artifact(name='processed results', result)

Add the collector steps (i.e. the 10 Tasks

pipe.add_task(...
post_execute_callback=Collector.collect_me
)

Add to the final?! step of the pipeline (I'm assuming there is one)

pipe.add_task(...
post_execute_callback=Collector.process_results
) `
wdyt?

  
  
Posted 2 years ago
86 Views
0 Answers
2 years ago
one year ago