Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hello Everyone! I'M Currently Trying To Set Up A Pipeline, And Am A Bit Confused At A Few Things. Some Questions I Have:

Hello everyone! I'm currently trying to set up a Pipeline, and am a bit confused at a few things. Some questions I have:
What does the intended workflow for making a "pipeline from tasks" look like? (I don't think I can use "pipeline from functions" due to how our codebase is arranged, and I assume also for decorators). Currently, I have a script which does some Task.create 's, adds them to a PipelineController with add_step , then runs the pipeline. However, since these tasks get cloned to run in the pipeline, I'm just left with some extra junk Draft tasks in my experiments section every time a pipeline is launched. I understand you can re-use tasks and just override parameters, but will this pull in new code changes to the repo between when the initial task was run and the pipeline? I am attempting to use the pre_execute_callback in add_step to create an input argument to this step of the pipeline. In the logic to create this, I need one of the existing input arguments. I was thinking that I could do this by using the PipelineController.Node input to the callback using node.job.task.get_parameter/set_parameter . (This is what the http://Allows%20a%20user%20to%20modify%20the%20Task%20before%20launch.%20Use%20node.job%20to%20access%20the%20ClearmlJob%20object,%20or%20node.job.task%20to%20directly%20access%20the%20Task%20object. ). However, I am finding that in the callback, node.job is None . Why is this?

  
  
Posted 2 years ago
Votes Newest

Answers 8


I'm a bit confused between the distinction / how to use these appropriately --

Task.init

does not have

repo

/

branch

args to set what code the task should be running.

It detects it automatically at run time 🙂 based on what is actually being used

My ideal is that I do exactly what

Task.create

does, but the task only goes into the pipeline section rather than making a new one in the experiments section.

Do you mean create the Task at pipeline run time?

Maybe it would make sense to have an "initialization function" calling the code and decorate it? This will allow you to both use existing codebase and be able to return values / artifacts to next steps (the main difference is that usually a standalone script will not have a return value, but this is exactly what you want in a pipeline). Does that make sense ?

BTW: SteadySeagull18 did you check the pipeline decorator?
https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_decorator.py

But it is a bit confusing that the docs suggest accessing

node.job.task

even though

node.job

is being set to

None

node.job is the runtime object of the Job (after being created) in the pre-callback the Job hasn't been created yet

  
  
Posted 2 years ago

I guess I'm just a bit confused by what the correct mental model is here. If I'm interpreting this correctly, I need to have essentially "template tasks" in my Experiments section whose sole purpose is to be copied for use in the Pipeline? When I'm setting up my Pipeline, I can't go "here are some brand new tasks, please run them", I have to go "please run existing task A with these modifications, then task B with these modifications, then task C with these modifications?" And when the pipeline is run, it will automagically modify repo / branch on those branches to the correct values? Can I manually set these somewhere to be certain?

I looked into the decorator and add_function_step options, but it seemed that they required modifications of our code to put all of the import statements into the beginning of the wrapped functions to get namespaces initialized, which is not what we want to do with our existing scripts. I'm not sure the wrapped setup function will work either, import statements in the outer function won't propagate namespaces to functions it calls.

(To be fair though, I have not actually tried using the decorator. I was trying to get add_function_step to work for a while, then ran into the above namespace issue, and switched back to using tasks)

As for the node, this confusing bit is that this is text from the docs which seems to suggest that the node will be fully initialized before the callback:

pre_execute_callback

(

Optional

[

Callable

[

[

PipelineController

,

PipelineController.Node

,

dict

]

,

bool

]

]

noqa

) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

  
  
Posted 2 years ago

When I'm setting up my Pipeline, I can't go "here are some brand new tasks, please run them",

I think this is the main point. Can you create those Tasks via Task.create and get what you want? If so, then sure you can do that:
` def create_step_task(a_node):
task = Task.create(...)
return task

pipe.add_step(
name="stage_process",
parents=["stage_data"],
base_task_factor=create_step_task
) `wdyt?

As for the node, this confusing bit is that this is text from the docs which seems to suggest that the node will be fully initialized before the callback:

yes the node but not the Job, so that you can have control 🙂

  
  
Posted 2 years ago

Hi AgitatedDove14 , thanks for the response!

I'm a bit confused between the distinction / how to use these appropriately -- Task.init does not have repo / branch args to set what code the task should be running. Ideally, when I run the pipeline I run the current branch of whoever is launching the pipeline which I can do with Task.create . It also seems like Task.init will still make new tasks if artifacts are recorded?

My ideal is that I do exactly what Task.create does, but the task only goes into the pipeline section rather than making a new one in the experiments section.

As for the callback, that will work for setting the new parameter! As for accessing an existing parameter, it seems the node only have the overridden parameters set, not any of the previously existing parameters. (I am not overriding any parameters, and it is an empty dictionary)

  
  
Posted 2 years ago

Oooo I didn't notice the base_task_factory argument before, that seems exactly like what I'd want. I will try that now! Thank you.

I think the docstring is just a bit confusing since it seems to directly recommend accessing node.job.task to access/modify things. I believe I have found a workaround for my specific case though by using pipeline.get_processed_nodes() to grab some relevant info from the previously completed step.

  
  
Posted 2 years ago

SteadySeagull18 btw: in post-callback the node.job will be completed
because it is a called after the Task is completed

  
  
Posted 2 years ago

Hi SteadySeagull18

What does the intended workflow for making a "pipeline from tasks" look like?

The idea is if you have existing Tasks in the system and you want to launch them one after the other with control over inputs (or outputs of them) you can do that, without writing any custom code.

Currently, I have a script which does some

Task.create

's,

Notice that your script should do Task.init - Not Task.create, as Task create is designed to create additional auxiliary Tasks not connect the running script, does that make sense?

I am attempting to use the

pre_execute_callback

in

add_step

to create an input argument to this step of the pipeline.

I think you want to change the defined arguments instead, basically if you have:
pipe.add_step( name="stage_process", parents=["stage_data"], base_task_project="examples", base_task_name="Pipeline step 2 process dataset", parameter_override={ "General/dataset_url": "${stage_data.artifacts.dataset.url}", "General/test_size": 0.25, }, pre_execute_callback=pre_execute_callback_example, post_execute_callback=post_execute_callback_example, )you can change the parameter_override to a value that you want:
def pre_execute_callback_example(a_pipeline, a_node, current_param_override): a_node.parameters["General/dataset_url"] = "my new value here"What do you think?

  
  
Posted 2 years ago

But it is a bit confusing that the docs suggest accessing node.job.task even though node.job is being set to None

  
  
Posted 2 years ago
1K Views
8 Answers
2 years ago
one year ago
Tags
Similar posts