Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
[Pipeline] Am I Right In Saying A Pipeline Controller Can’T Include A Data-Dependent For-Loop? The Issue Is Not Spinning Up The Tasks, It’S Collecting The Results At The End. I Was Trying To Append The Outputs Of Each Iteration Of The For-Loop And Pass Th

[Pipeline] Am I right in saying a Pipeline controller can’t include a data-dependent for-loop? The issue is not spinning up the Tasks, it’s collecting the results at the end. I was trying to append the outputs of each iteration of the for-loop and pass the list into another step.
Anyone got any good suggestions for this?

  
  
Posted one year ago
Votes Newest

Answers 13


The Dataset object itself is not being passed around. The point of showing you that was to say that the Dataset may change and therefore the number of objects (loaded from the Dataset, eg a number of pandas DataFrames that were CSV’s in the dataset) could change

  
  
Posted one year ago

Oohh interesting! Thanks for the minimal example though. We might want to add it to the docs as an example of dynamic DAG creation 🙂

  
  
Posted one year ago

For reference, this works as expected:

  
  
Posted one year ago

(including caching, even if the number of elements in the list of vals changes)

  
  
Posted one year ago

I get an error about incorrect Task ID’s – in the above pseudo code it would be the ID of the step Task that was displayed in the error

  
  
Posted one year ago

Ahh okay.

I’m an absolute numpty.

I had enabled caching on the Pipeline Task that was grabbing a load of ClearML IDs and so it was trying to “get” datasets that had since been deleted.

Thanks for the nudge to minimal test – silly I didn’t do it before asking!

Appreciate your help.

  
  
Posted one year ago

So the DAG is getting confused on bringing the results of the Tasks together

  
  
Posted one year ago

Hi ReassuredOwl55 , can you please elaborate on your use case or exactly what you're trying to achieve?

  
  
Posted one year ago

If that's true, the error should be on the combine function, no? Do you have a more detailed error log or minimal reproducible example?

  
  
Posted one year ago

Not exactly sure what is going wrong without an exact error or reproducible example.

However, passing around the dataset object is not ideal, because passing info from one step to another in a pipeline requires ClearML to pickle said object and I'm not exactly sure a Dataset obj is picklable.

Next to that, running get_local_copy() in the first step does not guarantee that you can access that data from the other step. Both might be executed in different docker containers or even on different machines.

So for starters I would not pass through the dataobj, but the dataset_id and then get a local copy of it only in step(). The cache should still work with dataset_id as argument too.

I also think there might be limitations to using a for-loop to build a DAG. I think it might not work if you clone the pipeline and change the amount of iterations, but I wouldn't expect an error, just wrong DAG

  
  
Posted one year ago

I have already tested that the for loop does work, including caching, when spinning out multiple Tasks.

As I say, the issue is grouping the results of the tasks into a list and passing them into another step

  
  
Posted one year ago

e.g. pseudo for illustration only
` def get_list(dataset_id):
from clearml import Dataset
ds= Dataset.get(dataset_id=dataset_id)
ds_dir=ds.get_local_copy()
etc...
return list_of_objs # one for each file, for example

def pipeline(dataset_id):
list_of_obj = get_list(dataset_id)
list_of_results = []
for obj in list_of_obj:
list_of_results.append(step(obj))
combine(list_of_results) One benefit is being able to make use of the Pipeline caching so if new data were added, adding elements to the list_of_obj, we’d be able to use the cache of the step ` Task for the old objs. The caching is the main thing but even being able to use the Pipeline interface for this kind of job would be nice as the Pipeline has a lot of nice lineage features.

Where combine , get_list and step are Pipeline steps and pipeline is the controller

  
  
Posted one year ago

Producing it now — thanks for your help, won’t be a few mins

  
  
Posted one year ago
1K Views
13 Answers
one year ago
one year ago
Tags