pipe.add_step(name='stage_process', parents=['stage_data', ],
base_task_project='examples', base_task_name='pipeline step 2 process dataset',
parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}',
'General/test_size': 0.25}, pre_execute_callback=pre_execute_callback_example, post_execute_callback=post_execute_callback_example)
I initially wasn't able to get the value this way.
in the pre_execute_callback, you can actually access any task in the pipeline. You can either directly access a node (task) in the pipe like the example above, or you can use the parent like this:pipe._nodes[a_node.parents[0]].job.task.artifacts
AnxiousSeal95 Basically its a function step return. if I do, artifacts.keys(), there are no keys, even though the step prior to it does return the output
I'm not using decorators. I have a bunch of function_steps followed by a normal task step, where I've passed a base_task_id.
I want to check the value of one of the functional steps, and if it holds true, I want to execute the task step otherwise I want the pipeline to end there, since the task step is the last one.
AnxiousSeal95 I'm trying to access the specific value. I checked the type of task.artifacts and it's a ReadOnlyDict. Given that the return value I'm looking for is called merged_dataset_id, how would I go about doing that?
If you're using method decorators like https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_decorator.py , calling the steps is just like calling functions (The pipeline code translates them to tasks). Then the pipeline is a logic you write on your own and then you can add whatever logic needed. Makes sense?
This gets me the artifact that I return in step1
I think this is what you wanted
Now in step2, I add a pre_execute_callback
AnxiousSeal95 I just have a question, can you share an example of accessing an artifact of a previous step in the pre execute callback?
VexedCat68 you mean the artifact in the previous step is called "merged_dataset_id"? Is it an artifact or is it a parameter? And what issues are you having with accessing the parameter?
And in the pre_execute_callback, I can access this:a_pipeline._nodes[a_node.parents[0]].job.task.artifacts['data_frame']
pipe._nodes['stage_data'].job.task.artifacts
Thank you, this is a big help. I'll give this a go now.
So I'm looking at the example in the github, this is step1:def step_one(pickle_data_url): # make sure we have scikit-learn for this step, we need it to use to unpickle the object import sklearn # noqa import pickle import pandas as pd from clearml import StorageManager pickle_data_url = \ pickle_data_url or \ '
' local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url) with open(local_iris_pkl, 'rb') as f: iris = pickle.load(f) data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names']) data_frame.columns += ['target'] data_frame['target'] = iris['target'] return data_frame
I checked the value is being returned, but I'm having issues accessing merged_dataset_id in the preexecute_callback like the way you showed me.
I'm both printing it and writing it to a file
I then did what MartinB suggested and got the id of the task from the pipeline DAG, and then it worked.
If you return on a pre_execute_callback false (or 0, not 100% sure 🙂 ) the step just won't run.
Makes sense?