Hi AgitatedDove14 ,
I have already developed a mock test that can be somewhat similar to the pipeline we are developing. The same problem arises. Only the task is created for the first set of parameters in the for loop. Here, only the configuration text file is created for user 1. Can you reproduce it?
` from clearml import Task
from clearml.automation.controller import PipelineDecorator
@PipelineDecorator.component(
return_values=["admin_config_path"], cache=False, task_type=Task.TaskTypes.custom,
)
def admin_config_creation(config):
# import step requirements
import json
from pathlib import Path
# step code
config_filename = Path(".") / "admin_config.txt"
with config_filename.open("w") as config_file:
config_file.write(json.dumps(config, indent="\t"))
return config_filename
@PipelineDecorator.component(
return_values=["user_config_path"], cache=False, task_type=Task.TaskTypes.custom,
)
def user_config_creation(user_name, user_pass, admin_config_filename):
# import step requirements
from pathlib import Path
# step code
if user_pass.replace(" ", "").lower() != "imnotarobot":
raise ValueError("Wrong password. The configuration could not be created.")
user_config = f"(admin: {Path(admin_config_filename).stem}) Configuration created for {user_name}:\n"
user_config += (
" - ML Framework: Keras\n - Model: CNN2D\n - Problem type: Regression"
)
config_filename = Path(".") / f"{user_name}_config.txt"
with config_filename.open("w") as config_file:
config_file.write(user_config)
config_filename = str(config_filename)
print("User configuration filename casted to str?")
print(f"{config_filename} ({type(config_filename)})")
return config_filename
@PipelineDecorator.pipeline(
name="Testing pipeline's component in for loop",
project="myproject",
version="0.0.1",
)
def executing_pipeline():
# run step 1
print("Step in progress -- Admin configuration setup")
admin_config_path = admin_config_creation(
config={"name": "Guido van Rossum", "pass": "benevolent_dictator_for_life_@123"}
)
# process step 1 results
admin_config_path_1 = str(admin_config_path).replace("admin", "Gandalf")
admin_config_path_2 = str(admin_config_path).replace("admin", "Elrond")
# run step 2
users = {
"user_1": ("Im Not A Robot", admin_config_path_1),
"user_2": ("im not A robOt", admin_config_path_2),
}
users_config_filenames = {}
for name, (password, new_admin_config_path) in users.items():
print(f"Step in progress -- {name} configuration setup")
users_config_filenames[name] = user_config_creation(
user_name=name, user_pass=password, admin_config_filename=new_admin_config_path
)
print(users_config_filenames[name])
print(type(users_config_filenames[name]))
print("Users configuration files have been created. Check them out at:")
print(users_config_filenames) # I explicitly casted user configuration paths to str but they're still pathlib objects :O How?!
print("Pipeline completed!")
if name == "main":
# run the pipeline steps as subprocess on the current machine, for debugging purposes.
PipelineDecorator.debug_pipeline()
# start the pipeline execution logic.
executing_pipeline() `
I wonder, does it launch all "step two" instances in parallel ?
In theory it should , but in practice since these are the same "template" I'm not sure what would happen.
One last note, you can call PipelineDecorator.debug_pipeline()
to debug the pipeline locally, it will have the exact same behavior only it will run the steps as subprocesses.
I tested cache=False
and I still get the same error 😕 In the dashboard the task corresponding to step_two
does not appear duplicated, I assume the task is being launched sequentially. I'm going to prepare a more elaborate example to see what happens. Currently I can't run PipelineDecorator.debug_pipeline()
because I need at least two devices to read some data and process it on the other one.
Oh, I see. In the meantime I will duplicate the function and rename it so I can work with a different configuration. I really appreciate your effort as well as having a continuous feedback to keep improving this wonderful library!
Thanks GiganticTurtle0 !
I will try to reproduce with the example you provided. regardless I already took a look at the code, and I'm pretty sure I know what the issue is. We will be pushing a few fixes after the weekend, I'm hoping this one will be included as well 🙂
So great! It would be a feature that would make the work much easier instead of having to clone the task and launch it with different parameters. It could even be considered more pythonic. Do you have an immediate solution in mind to keep moving forward before the new release is ready? :)
The issue itself is the name of the function (bottom line it has to be unique for every call). So the only very ugly hack is to copy paste the function X times?! 😞
(I'll see if we can push the fix to GitHub sooner)
I assume the task is being launched sequentially. I'm going to prepare a more elaborate example to see what happens.
Let me know if you can produce a mock test, I would love to make sure we support the use case, this is a great example of using pipeline logic 🙂
Hi GiganticTurtle0
The main issue is the cache=True
it will cause the second time you call the function to essentially reuse the Task, ending with the same result.
Can you test with cache=False
in the decorator ?