Is there anything stopping you from using regular conditions life 'if' ?
CostlyOstrich36 This didn't work, the value is -1 however the pipe didn't stop.
Tagging AgitatedDove14 SuccessfulKoala55 For anyone available right now to help out.
Another question, in the parents sequence in pipe.add_step, we have to pass in the name of the step right?
Wait, so the pipeline step only runs if the pre execute callback returns True? It'll stop if it doesn't run?
VexedCat68 , what if you simply add pip.stop()
? Does it not stop the pipeline? Can you maybe add a print to verify that during the run the value is indeed -1? Also looking from your code it looks like you're comparing the 'merged_dataset_id' to -1
I did what you said, and got the pipeline DAG and then the executed of the step to use as ID. Thank you it worked fine.
VexedCat68 both are valid. In case the step was cached (i.e. already executed) the node.job will be None, so it is probably safer to get the Task based on the "executed" field which stores the Task ID used.
If there aren't N datasets, the function step doesn't Squash the datasets and instead just returns -1.
Thus if I get -1, I want the pipeline execution to end or the proceeding task to be skipped.
I have checked in the args, the value is indeed -1. Unless there is some other way for conditional pipeline steps execution.
since I've either added add_functional_step or add_step
After the step which gets the merged dataset, I should use pipe.stop if it returned -1?
If I understood this correctly, so in case where we have defined steps in order as a parent child. If the parent had a pre execute callback return False, will all subsequent children nodes/steps not execute or will they just ignore it and still execute?
In another answer, I was shown that I can access it like this. How can I go about accessing the value of merged_dataset_id which was returned by merge_n_datasets and stored as an artifact.
Wait, so the pipeline step only runs if the pre execute callback returns True? It'll stop if it doesn't run?
Only if you have a Callback function, and that callback function returns False, then it will skip it (otherwise it will process it)
Another question, in the parents sequence in pipe.add_step, we have to pass in the name of the step right?
Correct, the step name is a unique identifier for the pipeline
how would I access the artifact of a previous step within the pre execute callback? Can you share an example?
Basically you can do:def pre_execute_callback_example(a_pipeline, a_node, current_param_override): # type (PipelineController, PipelineController.Node, dict) -> bool nodes = a_pipeline.get_pipeline_dag() parent_task_id = nodes[a_node.parents[0]].executed Task.get_task(parent_task_id).artifacts[...]
Is the only possible way to get a specific node
See the following (it is a dictionary where the key is the step/node unique name)pipeline.get_pipeline_dag()
Okay so I read the docs and the above questions are cleared now thank you. I just have one other question, how would I access the artifact of a previous step within the pre execute callback? Can you share an example?
Not sure myself. I have a pipeline step now, that'll return either clearml dataset id or -1. I want to stop the pipeline execution if I get -1 in the output of that step but I'm not sure how to achieve that
Hmmmm this looks like what you're looking for:
https://clear.ml/docs/latest/docs/references/sdk/automation_controller_pipelinecontroller#stop-1
Tell me if this helps 🙂
Hi VexedCat68
(sorry I just saw the message)
I wanted to ask, how to run pipeline steps conditionally? E.g if step returns a specific value, exit the pipeline or run another step instead of the sequential step
So do do so you can do:
` def pre_execute_callback_example(a_pipeline, a_node, current_param_override):
# if we want to skip this node (and subtree of this node) we return False
...
# ew decided to skip so we return False
return False
pipe.add_step(name='stage_process', parents=['stage_data', ],
base_task_project='examples', base_task_name='step 2',
pre_execute_callback=pre_execute_callback_example,
...
) `Reference
https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_tasks.py
regrading the actual artifact access, this is the usual Task.artifacts access: see example here:
https://github.com/allegroai/clearml/blob/master/examples/reporting/artifacts_retrieval.py
Is the only possible way to get a specific node, is to use one of the get_running_nodes or get_processed_nodes, and then checking every node in the list to see if the name matches the one we're looking for?