Hi ReassuredOwl55 , can you please elaborate on your use case or exactly what you're trying to achieve?
e.g. pseudo for illustration only
` def get_list(dataset_id):
from clearml import Dataset
ds= Dataset.get(dataset_id=dataset_id)
ds_dir=ds.get_local_copy()
etc...
return list_of_objs # one for each file, for example
def pipeline(dataset_id):
list_of_obj = get_list(dataset_id)
list_of_results = []
for obj in list_of_obj:
list_of_results.append(step(obj))
combine(list_of_results) One benefit is being able to make use of the Pipeline caching so if new data were added, adding elements to the list_of_obj, we’d be able to use the cache of the
step ` Task for the old objs. The caching is the main thing but even being able to use the Pipeline interface for this kind of job would be nice as the Pipeline has a lot of nice lineage features.
Where combine
, get_list
and step
are Pipeline steps and pipeline
is the controller
I get an error about incorrect Task ID’s – in the above pseudo code it would be the ID of the step
Task that was displayed in the error
Not exactly sure what is going wrong without an exact error or reproducible example.
However, passing around the dataset object is not ideal, because passing info from one step to another in a pipeline requires ClearML to pickle said object and I'm not exactly sure a Dataset obj is picklable.
Next to that, running get_local_copy() in the first step does not guarantee that you can access that data from the other step. Both might be executed in different docker containers or even on different machines.
So for starters I would not pass through the dataobj, but the dataset_id and then get a local copy of it only in step(). The cache should still work with dataset_id as argument too.
I also think there might be limitations to using a for-loop to build a DAG. I think it might not work if you clone the pipeline and change the amount of iterations, but I wouldn't expect an error, just wrong DAG
The Dataset object itself is not being passed around. The point of showing you that was to say that the Dataset may change and therefore the number of objects (loaded from the Dataset, eg a number of pandas DataFrames that were CSV’s in the dataset) could change
I have already tested that the for loop does work, including caching, when spinning out multiple Tasks.
As I say, the issue is grouping the results of the tasks into a list and passing them into another step
So the DAG is getting confused on bringing the results of the Tasks together
If that's true, the error should be on the combine function, no? Do you have a more detailed error log or minimal reproducible example?
Producing it now — thanks for your help, won’t be a few mins
Ahh okay.
I’m an absolute numpty.
I had enabled caching on the Pipeline Task that was grabbing a load of ClearML IDs and so it was trying to “get” datasets that had since been deleted.
Thanks for the nudge to minimal test – silly I didn’t do it before asking!
Appreciate your help.
(including caching, even if the number of elements in the list of vals changes)
Oohh interesting! Thanks for the minimal example though. We might want to add it to the docs as an example of dynamic DAG creation 🙂