Hi SteadySeagull18
What does the intended workflow for making a "pipeline from tasks" look like?
The idea is if you have existing Tasks in the system and you want to launch them one after the other with control over inputs (or outputs of them) you can do that, without writing any custom code.
Currently, I have a script which does some
Task.create
's,
Notice that your script should do Task.init - Not Task.create, as Task create is designed to create additional auxiliary Tasks not connect the running script, does that make sense?
I am attempting to use the
pre_execute_callback
in
add_step
to create an input argument to this step of the pipeline.
I think you want to change the defined arguments instead, basically if you have:pipe.add_step( name="stage_process", parents=["stage_data"], base_task_project="examples", base_task_name="Pipeline step 2 process dataset", parameter_override={ "General/dataset_url": "${stage_data.artifacts.dataset.url}", "General/test_size": 0.25, }, pre_execute_callback=pre_execute_callback_example, post_execute_callback=post_execute_callback_example, )
you can change the parameter_override
to a value that you want:def pre_execute_callback_example(a_pipeline, a_node, current_param_override): a_node.parameters["General/dataset_url"] = "my new value here"
What do you think?
Hi AgitatedDove14 , thanks for the response!
I'm a bit confused between the distinction / how to use these appropriately -- Task.init
does not have repo
/ branch
args to set what code the task should be running. Ideally, when I run the pipeline I run the current branch of whoever is launching the pipeline which I can do with Task.create
. It also seems like Task.init
will still make new tasks if artifacts are recorded?
My ideal is that I do exactly what Task.create
does, but the task only goes into the pipeline section rather than making a new one in the experiments section.
As for the callback, that will work for setting the new parameter! As for accessing an existing parameter, it seems the node only have the overridden parameters set, not any of the previously existing parameters. (I am not overriding any parameters, and it is an empty dictionary)
But it is a bit confusing that the docs suggest accessing node.job.task
even though node.job
is being set to None
I'm a bit confused between the distinction / how to use these appropriately --
Task.init
does not have
repo
/
branch
args to set what code the task should be running.
It detects it automatically at run time 🙂 based on what is actually being used
My ideal is that I do exactly what
Task.create
does, but the task only goes into the pipeline section rather than making a new one in the experiments section.
Do you mean create the Task at pipeline run time?
Maybe it would make sense to have an "initialization function" calling the code and decorate it? This will allow you to both use existing codebase and be able to return values / artifacts to next steps (the main difference is that usually a standalone script will not have a return value, but this is exactly what you want in a pipeline). Does that make sense ?
BTW: SteadySeagull18 did you check the pipeline decorator?
https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_decorator.py
But it is a bit confusing that the docs suggest accessing
node.job.task
even though
node.job
is being set to
None
node.job is the runtime object of the Job (after being created) in the pre-callback the Job hasn't been created yet
I guess I'm just a bit confused by what the correct mental model is here. If I'm interpreting this correctly, I need to have essentially "template tasks" in my Experiments section whose sole purpose is to be copied for use in the Pipeline? When I'm setting up my Pipeline, I can't go "here are some brand new tasks, please run them", I have to go "please run existing task A with these modifications, then task B with these modifications, then task C with these modifications?" And when the pipeline is run, it will automagically modify repo
/ branch
on those branches to the correct values? Can I manually set these somewhere to be certain?
I looked into the decorator and add_function_step
options, but it seemed that they required modifications of our code to put all of the import
statements into the beginning of the wrapped functions to get namespaces initialized, which is not what we want to do with our existing scripts. I'm not sure the wrapped setup function will work either, import statements in the outer function won't propagate namespaces to functions it calls.
(To be fair though, I have not actually tried using the decorator. I was trying to get add_function_step
to work for a while, then ran into the above namespace issue, and switched back to using tasks)
As for the node, this confusing bit is that this is text from the docs which seems to suggest that the node will be fully initialized before the callback:
pre_execute_callback
(
Optional
[
Callable
[
[
PipelineController
,
PipelineController.Node
,
dict
]
,
bool
]
]
noqa
) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.
When I'm setting up my Pipeline, I can't go "here are some brand new tasks, please run them",
I think this is the main point. Can you create those Tasks via Task.create and get what you want? If so, then sure you can do that:
` def create_step_task(a_node):
task = Task.create(...)
return task
pipe.add_step(
name="stage_process",
parents=["stage_data"],
base_task_factor=create_step_task
) `wdyt?
As for the node, this confusing bit is that this is text from the docs which seems to suggest that the node will be fully initialized before the callback:
yes the node but not the Job, so that you can have control 🙂
Oooo I didn't notice the base_task_factory
argument before, that seems exactly like what I'd want. I will try that now! Thank you.
I think the docstring is just a bit confusing since it seems to directly recommend accessing node.job.task
to access/modify things. I believe I have found a workaround for my specific case though by using pipeline.get_processed_nodes()
to grab some relevant info from the previously completed step.
SteadySeagull18 btw: in post-callback the node.job will be completed
because it is a called after the Task is completed