Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
It Is A Good Practice To Call A Function Decorated By

It is a good practice to call a function decorated by PipelineDecorator in a for loop? I tried it in a real-world example and I didn't get the results I expected. In the toy example code I include below you can see that function step_two is used to obtain different results in the same pipeline

` from clearml import Task
from clearml.automation.controller import PipelineDecorator

@PipelineDecorator.component(
return_values=["result_step_one"], cache=True, task_type=Task.TaskTypes.custom
)
def step_one(data_uri):
# import step requirements
# ...
# data reading and preprocessing code
result = ...
return result

@PipelineDecorator.component(
return_values=["result_step_two"], cache=True, task_type=Task.TaskTypes.custom,
)
def step_two(preprocessed_data, model_name):
# import step requirements
# ...
# model-specific preprocessing code
result = ...
return result

@PipelineDecorator.pipeline(
name="Pipeline experiment",
project="myproject",
version="XXXX",
pipeline_execution_queue="myqueue",
)
def executing_pipeline():

print("launch step one")
preprocessed_data = step_one("/usr/data")

# let's operate on the resulting data
preprocessed_data_1 = preprocessed_data * 0.5
preprocessed_data_2 = preprocessed_data ** (1 / 3)

model_specific_data = {}
for name, data in zip(
    ["model_1", "model_2"], [preprocessed_data_1, preprocessed_data_2]
):
    print(f"launch step two - model: {name.title()}")
    model_specific_data[name] = step_two(preprocessed_data=data, model_name=name)

print("Data is now prepared for model feeding. Check it out:")
print(model_specific_data)

print("Pipeline completed!")

if name == "main":

PipelineDecorator.set_default_execution_queue("myqueue")

# start the pipeline execution logic.
executing_pipeline() `
  
  
Posted 3 years ago
Votes Newest

Answers 9


Hi AgitatedDove14 ,
I have already developed a mock test that can be somewhat similar to the pipeline we are developing. The same problem arises. Only the task is created for the first set of parameters in the for loop. Here, only the configuration text file is created for user 1. Can you reproduce it?
` from clearml import Task
from clearml.automation.controller import PipelineDecorator

@PipelineDecorator.component(
return_values=["admin_config_path"], cache=False, task_type=Task.TaskTypes.custom,
)
def admin_config_creation(config):
# import step requirements
import json
from pathlib import Path

# step code
config_filename = Path(".") / "admin_config.txt"
with config_filename.open("w") as config_file:
    config_file.write(json.dumps(config, indent="\t"))
return config_filename

@PipelineDecorator.component(
return_values=["user_config_path"], cache=False, task_type=Task.TaskTypes.custom,
)
def user_config_creation(user_name, user_pass, admin_config_filename):
# import step requirements
from pathlib import Path

# step code
if user_pass.replace(" ", "").lower() != "imnotarobot":
    raise ValueError("Wrong password. The configuration could not be created.")
user_config = f"(admin: {Path(admin_config_filename).stem}) Configuration created for {user_name}:\n"
user_config += (
    "    - ML Framework: Keras\n    - Model: CNN2D\n    - Problem type: Regression"
)
config_filename = Path(".") / f"{user_name}_config.txt"
with config_filename.open("w") as config_file:
    config_file.write(user_config)
config_filename = str(config_filename)
print("User configuration filename casted to str?")
print(f"{config_filename} ({type(config_filename)})")
return config_filename

@PipelineDecorator.pipeline(
name="Testing pipeline's component in for loop",
project="myproject",
version="0.0.1",
)
def executing_pipeline():

# run step 1
print("Step in progress -- Admin configuration setup")
admin_config_path = admin_config_creation(
    config={"name": "Guido van Rossum", "pass": "benevolent_dictator_for_life_@123"}
)

# process step 1 results
admin_config_path_1 = str(admin_config_path).replace("admin", "Gandalf")
admin_config_path_2 = str(admin_config_path).replace("admin", "Elrond")

# run step 2
users = {
    "user_1": ("Im Not A Robot", admin_config_path_1),
    "user_2": ("im not A robOt", admin_config_path_2),
}
users_config_filenames = {}

for name, (password, new_admin_config_path) in users.items():
    print(f"Step in progress -- {name} configuration setup")
    users_config_filenames[name] = user_config_creation(
        user_name=name, user_pass=password, admin_config_filename=new_admin_config_path
    )
    print(users_config_filenames[name])
    print(type(users_config_filenames[name]))
print("Users configuration files have been created. Check them out at:")
print(users_config_filenames)  # I explicitly casted user configuration paths to str but they're still pathlib objects :O How?!

print("Pipeline completed!")

if name == "main":

# run the pipeline steps as subprocess on the current machine, for debugging purposes.
PipelineDecorator.debug_pipeline()

# start the pipeline execution logic.
executing_pipeline() `
  
  
Posted 3 years ago

I wonder, does it launch all "step two" instances in parallel ?
In theory it should , but in practice since these are the same "template" I'm not sure what would happen.
One last note, you can call PipelineDecorator.debug_pipeline() to debug the pipeline locally, it will have the exact same behavior only it will run the steps as subprocesses.

  
  
Posted 3 years ago

I tested cache=False and I still get the same error 😕 In the dashboard the task corresponding to step_two does not appear duplicated, I assume the task is being launched sequentially. I'm going to prepare a more elaborate example to see what happens. Currently I can't run PipelineDecorator.debug_pipeline() because I need at least two devices to read some data and process it on the other one.

  
  
Posted 3 years ago

Oh, I see. In the meantime I will duplicate the function and rename it so I can work with a different configuration. I really appreciate your effort as well as having a continuous feedback to keep improving this wonderful library!

  
  
Posted 3 years ago

Thanks GiganticTurtle0 !
I will try to reproduce with the example you provided. regardless I already took a look at the code, and I'm pretty sure I know what the issue is. We will be pushing a few fixes after the weekend, I'm hoping this one will be included as well 🙂

  
  
Posted 3 years ago

So great! It would be a feature that would make the work much easier instead of having to clone the task and launch it with different parameters. It could even be considered more pythonic. Do you have an immediate solution in mind to keep moving forward before the new release is ready? :)

  
  
Posted 3 years ago

The issue itself is the name of the function (bottom line it has to be unique for every call). So the only very ugly hack is to copy paste the function X times?! 😞
(I'll see if we can push the fix to GitHub sooner)

  
  
Posted 3 years ago

I assume the task is being launched sequentially. I'm going to prepare a more elaborate example to see what happens.

Let me know if you can produce a mock test, I would love to make sure we support the use case, this is a great example of using pipeline logic 🙂

  
  
Posted 3 years ago

Hi GiganticTurtle0
The main issue is the cache=True it will cause the second time you call the function to essentially reuse the Task, ending with the same result.
Can you test with cache=False in the decorator ?

  
  
Posted 3 years ago