I tried specifying helpers functions but it still gives the same error. If I define a component through the following code:
` from typing import Optional
from clearml.automation.controller import PipelineDecorator
@PipelineDecorator.component(...)
def step_data_loading(path: str, target_dir: Optional[str] = None):
pass Then in the automatically created script I find the following code:
from clearml.automation.controller import PipelineDecorator
def step_data_loading(path: str, target_dir: Optional[str] = None):
pass
if name == 'main':
task = Task.init()
kwargs = {'path': None, 'target_dir': None}
task.connect(kwargs, name='kwargs')
function_input_artifacts = {}
params = task.get_parameters() or dict()
for k, v in params.items():
if not v or not k.startswith('kwargs_artifacts/'):
continue
k = k.replace('kwargs_artifacts/', '', 1)
task_id, artifact_name = v.split('.', 1)
kwargs[k] = Task.get_task(task_id=task_id).artifacts[artifact_name].get()
results = step_data_loading(**kwargs)
result_names = []
if result_names:
if not isinstance(results, (tuple, list)) or (len(result_names) == 1 and len(results) != 1):
results = [results]
for name, artifact in zip(result_names, results):
task.upload_artifact(name=name, artifact_object=artifact) `
Which shows that the type hints are retained
. However, despite having imported the required types from the
typing
library in the script where the function decorated with
PipelineDecorator.component
is defined, later in the generated script the
typing
library is not imported outside the scope of the function
Actually the typing part is not passed to the "created step" , because there are no global imports, for eexample:def step(a: pd.DataFrame): import pandas as pd pass
The imports are in the function step (we cannot move them outside without a very fragile code analysis) so the created function Task will not had pands imported on the global scope and you will end up with an exception "pd not defined".
Make sense?
GiganticTurtle0 the fix was not applied in 1.1.2 (which was a hot fix after pyjwt interface changed and broke compatibility)
The type hint fix it on the latest RC:pip install clearml==1.1.3rc0
I just verified with your example
apologies for the confusion, we will release 1.1.3 soon (we just need to make sure all tests pass with a few PRs that were merged)
Hi AgitatedDove14 , great, glad it was fixed quickly!
By the way, before releasing version 1.1.3 you might want to take a look at this mock example. I'm trying to run the same pipeline (with different configurations) in a single for loop, as you can see below:
` from clearml import Task
from clearml.automation.controller import PipelineDecorator
@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue1")
def step_1(msg: str):
msg += "\nI've survived step 1!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue2")
def step_2(msg: str):
msg += "\nI've also survived step 2!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue3")
def step_3(msg: str):
msg += "\nI can't believe I survived step 3 too!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue4")
def step_4(msg: str):
msg += "\nAmazing, I survived the final step!"
return msg
@PipelineDecorator.pipeline(
name="orchestrator_pipe",
project="Untitled",
version="1.0.0",
pipeline_execution_queue="pipeline_controllers",
)
def execute_orchestrator(config: dict):
print("Player name:", config["player_name"])
print("Identity:", config["player_identity"])
msg = f"{config['player_name'].title()} playing survival step game"
msg = msg + "\n" + "-" * len(msg)
msg = step_1(msg)
msg = step_2(msg)
msg = step_3(msg)
msg = step_4(msg)
print(msg)
print("Victory!", end="\n")
if name == "main":
PipelineDecorator.debug_pipeline()
PLAYERS_NAMES = ["Frank", "Alexander", "John"]
PLAYERS_IDENTITIES = ["Renegade", "Observer", "Lazy"]
for player_name, player_identity in zip(PLAYERS_NAMES, PLAYERS_IDENTITIES):
print(f"Executing pipeline for {player_name}")
config = dict()
config["player_name"] = player_name
config["player_identity"] = player_identity
execute_orchestrator(config)
print(f"Pipeline finished for {player_name}", end="\n\n") `
But I'm getting the following error:Traceback (most recent call last): File "/home/user/myproject/concurrent_pipelines_lab.py", line 61, in <module> execute_orchestrator(config) File "/home/user/anaconda3/envs/myenv/lib/python3.9/site-packages/clearml/automation/controller.py", line 2213, in internal_decorator func(**pipeline_kwargs) File "/home/user/myproject/concurrent_pipelines_lab.py", line 40, in execute_orchestrator msg = step_1(msg) File "/home/user/anaconda3/envs/myenv/lib/python3.9/site-packages/clearml/automation/controller.py", line 2058, in wrapper _node = cls._singleton._nodes[_name] KeyError: 'step_1'
Can you reproduce this example? Are the new style pipelines designed to work like this? Because it would be amazing if they could.
The reason I want to achieve this is to allow several pipelines to run simultaneously (to give a real world example, imagine I want to develop several models concurrently for different configurations)
BTW, it looks like a lot of users really like the idea of runnig pipeline steps as subprocesses (which frankly I really cannot understand as Python Process is such an amazing tool to do just that),
anyhow We will have PipelineDecorator.debug_pipeline()
which will run the pipeline steps as functions, and PipelineDecorator.execute_locally()
which will run the Pipeline steps as subprocess
wdyt?
Sure, converting pipelines into components also works for me (ignoring still having to fix the problem with LazyEvalWrapper
return values). But this way some interesting features of the pipeline are missing, such as displaying the step execution DaG in the PLOTS tab .
I tried specifying helpers functions but it still gives the same error.
What's the error you are getting ?
GiganticTurtle0 we had this discussion in the wrong thread, I moved it here.
Moved from the wrong thread
Martin.B [1:55 PM]
GiganticTurtle0 the sample mock pipeline seems to be running perfectly on the latest code from GitHub, can you verify ?
Martin.B [1:55 PM]
Spoke too soon, sorry 🙂 issue is reproducible, give me a minute here
Alejandro C [1:59 PM]
Oh, and which approach do you suggest to achieve the same goal (simultaneously running the same pipeline with different configurations using a single for loop)? (edited)
Alejandro C [2:00 PM]
Unless there is a straightforward way to support it...
Martin.B [2:01 PM]
So why wouldn't you have:
` @PipelineDecorator.component(return_values=["msg"], execution_queue="services", helper_functions=[step_one, ...., step_four])
def execute_orchestrator(config: dict):
pass # stuff
return str(msg)
@PipelineDecorator.pipeline(...)
def main_pipeline():
PLAYERS_NAMES = ["Frank", "Alexander", "John"]
PLAYERS_IDENTITIES = ["Renegade", "Observer", "Lazy"]
for player_name, player_identity in zip(PLAYERS_NAMES, PLAYERS_IDENTITIES):
print(f"Executing pipeline for {player_name}")
config = dict()
config["player_name"] = player_name
config["player_identity"] = player_identity
execute_orchestrator(config)
print(f"Pipeline finished for {player_name}", end="\n\n") `(edited)
Alejandro C [2:10 PM]
Mmm that is a very good alternative, this way I can leverage the newly-introduced nested components. However, I think it would be reasonable (plus natural) to concurrently run the same pipeline with different configurations. For example, suppose having a single agent that asynchronously orchestrate all of them (so that it is not necessary to spin up an agent for each instance of the executed pipeline). I would be happy to have that feature if it were not overly complicated to implement (I mean concurrent pipelines, I know the "asynchronous agent" is already available through the --services-mode
CLI option) (edited)
Martin.B [2:14 PM]
Mmm that is a very good alternative, this way I can leverage the newly-introduced nested components.
It actually worked! out of the box (almost you have to cast the "msg" return value), this is so cool!
However, I think it would be reasonable (plus natural) to concurrently run the same pipeline with different configurations.
Pipeline is a Task, the idea is you have one Task pipline with the decorator, then task pipeline triggering the decorator pipelines with diff arguments.
what do you think ?
(It might be solvable to have nested decorated pipelines, but it will make the pipeline nested inside the pipeline function, which I'm not sure looks pretty ... ) (edited)
Alejandro C [2:19 PM]
I see. And is it the same for the PipelineController
? I mean, I can create several instances of PipelineController
, store them in a list and call the 'start' method for each instance in a for loop? Would that work? Or does it follow the same rules as PipelineDecorator
?
Martin.B [3:06 PM]
Hmm PipelineController
follows the same logic (singleton), kind of like Task.init
What I'm thinking is something like this example:
https://github.com/allegroai/clearml/blob/0a543d55d0055c9499b8cefdf669135740de9ce6/examples/pipeline/pipeline_from_functions.py#L72
Where the function itself is a self contained pipeline decorator , wdyt ? is this clean enough ?
I'm getting a NameError because 'Optional' type hint is not defined in the global scope
I have found it is not possible to start a pipeline B after a pipeline A. Following the previous example, I have added one more pipeline to the script:
` from clearml import Task
from clearml.automation.controller import PipelineDecorator
@PipelineDecorator.component(return_values=["msg"], execution_queue="model_trainings")
def step_1(msg: str):
msg += "\nI've survived step 1!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="model_trainings")
def step_2(msg: str):
msg += "\nI've also survived step 2!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="model_trainings")
def step_3(msg: str):
msg += "\nI can't believe I survived step 3 too!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="model_trainings")
def step_4(msg: str):
msg += "\nAmazing, I survived the final step!"
return msg
@PipelineDecorator.pipeline(
name="orchestrator_pipe",
project="Untitled",
version="1.0.0",
pipeline_execution_queue="pipeline_controllers",
)
def execute_orchestrator(config: dict):
task = Task.current_task()
print("Pipeline task ID:", task.id)
print("Pipeline task status:", task.status)
print("Player name:", config["player_name"])
print("Identity:", config["player_identity"])
msg = f"{config['player_name'].title()} playing survival step game"
msg = msg + "\n" + "-" * len(msg)
msg = step_1(msg)
msg = step_2(msg)
msg = step_3(msg)
msg = step_4(msg)
print("Victory!", end="\n")
@PipelineDecorator.pipeline(
name="bad_orchestrator_pipe",
project="Untitled",
version="1.0.0",
pipeline_execution_queue="pipeline_controllers",
)
def execute_bad_orchestrator(config: dict):
task = Task.current_task()
print("Pipeline task ID:", task.id)
print("Pipeline task status:", task.status)
print("Player name:", config["player_name"])
print("Identity:", config["player_identity"])
msg = f"{config['player_name'].title()} playing survival step game"
msg = msg + "\n" + "-" * len(msg)
msg = step_2(msg)
msg = step_1(msg)
msg = step_4(msg)
msg = step_3(msg)
print("Ha, you lost!", end="\n")
if name == "main":
PipelineDecorator.debug_pipeline(execute_steps_as_functions=False)
player_name = "Frank"
player_identity = "Renegade"
print(f"Executing pipeline for {player_name}")
config = dict()
config["player_name"] = player_name
config["player_identity"] = player_identity
execute_orchestrator(config)
# I also tried removing these two lines and it did not work.
pipeline_task = Task.current_task()
pipeline_task.mark_completed()
execute_bad_orchestrator(config) `I can't launch different pipelines in the same Python process either? Logic tells me when a pipeline execution is finished, the main task should be marked as completed and a new main task should be started, shouldn't it?
Mmm that's weird. Because I can see the type hints in the function's arguments of the automatically generated script. So, maybe I'm doing something wrong or it's a bug, since they have been passed to the created step (I'm using clearml version 1.1.2 and clearml-agent version 1.1.0).
Yes, I like it! I was already get used to the ' execute_steps_as_functions' argument of PipelineDecorator.debug_pipeline()
but I find your proposal to be more intuitive.
(ignoring still having to fix the problem with
LazyEvalWrapper
return values).
fix will be pushed post weekend 🙂
such as displaying the step execution DaG in the PLOTS tab . (edited)
Wait, what are you getting on the DAG plot ? I think we "should" be able to see all the steps
BTW, how can I run 'execute_orchestrator' concurrently? That is, launch it for several configurations at the same time? The way it's implemented now, it doesn't start the next configuration until the current one is finished.
Looks great, let me see if I can understand what's missing, because it should have worked ...
GiganticTurtle0 this one worked for me 🙂
` from clearml import Task
from clearml.automation.controller import PipelineDecorator
@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue1")
def step_1(msg: str):
msg += "\nI've survived step 1!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue2")
def step_2(msg: str):
msg += "\nI've also survived step 2!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue3")
def step_3(msg: str):
msg += "\nI can't believe I survived step 3 too!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue4")
def step_4(msg: str):
msg += "\nAmazing, I survived the final step!"
return msg
@PipelineDecorator.component(return_values=["msg"], execution_queue="services",
helper_functions=[step_1, step_2, step_3, step_4])
def execute_orchestrator(config: dict):
print("Player name:", config["player_name"])
print("Identity:", config["player_identity"])
msg = f"{config['player_name'].title()} playing survival step game"
msg = msg + "\n" + "-" * len(msg)
msg = step_1(msg)
msg = step_2(msg)
msg = step_3(msg)
msg = step_4(msg)
print(msg)
print("Victory!", end="\n")
return str(msg)
@PipelineDecorator.pipeline(
name="master_orchestrator_pipe_2",
project="debug",
version="1.0.0",
pipeline_execution_queue="pipeline_controllers",
)
def main_pipeline():
PLAYERS_NAMES = ["Frank", "Alexander", "John"]
PLAYERS_IDENTITIES = ["Renegade", "Observer", "Lazy"]
results = []
for player_name, player_identity in zip(PLAYERS_NAMES, PLAYERS_IDENTITIES):
print(f"Executing pipeline for {player_name}")
config = dict()
config["player_name"] = player_name
config["player_identity"] = player_identity
results.append(execute_orchestrator(config))
print(f"Pipeline finished for {player_name}")
print('Done Master PIPELINE\n\n')
if name == "main":
PipelineDecorator.debug_pipeline(execute_steps_as_functions=False)
main_pipeline() `
Hmm could it be this is on the "helper functions" ?
BTW, how can I run 'execute_orchestrator' concurrently?
It is launching simultaneously, (i.e. if you are not processing the output of the pipeline step function, the execution will not wait for its completion, notice that the call itself might take a few seconds, as it create a task and enqueues/sub-process it, but is it Not waiting for it)