Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Regarding The New Version 1.1.2, I Have Noticed Type Hints Are Now Included In The Script Generated By

Regarding the new version 1.1.2, I have noticed type hints are now included in the script generated by PipelineDecorator.component in the function arguments. However, despite having imported the required types from the typing library in the script where the function decorated with PipelineDecorator.component is defined, later in the generated script the typing library is not imported outside the scope of the function

  
  
Posted 2 years ago
Votes Newest

Answers 18


I have found it is not possible to start a pipeline B after a pipeline A. Following the previous example, I have added one more pipeline to the script:
` from clearml import Task
from clearml.automation.controller import PipelineDecorator

@PipelineDecorator.component(return_values=["msg"], execution_queue="model_trainings")
def step_1(msg: str):
msg += "\nI've survived step 1!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="model_trainings")
def step_2(msg: str):
msg += "\nI've also survived step 2!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="model_trainings")
def step_3(msg: str):
msg += "\nI can't believe I survived step 3 too!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="model_trainings")
def step_4(msg: str):
msg += "\nAmazing, I survived the final step!"
return msg

@PipelineDecorator.pipeline(
name="orchestrator_pipe",
project="Untitled",
version="1.0.0",
pipeline_execution_queue="pipeline_controllers",
)
def execute_orchestrator(config: dict):
task = Task.current_task()
print("Pipeline task ID:", task.id)
print("Pipeline task status:", task.status)
print("Player name:", config["player_name"])
print("Identity:", config["player_identity"])
msg = f"{config['player_name'].title()} playing survival step game"
msg = msg + "\n" + "-" * len(msg)
msg = step_1(msg)
msg = step_2(msg)
msg = step_3(msg)
msg = step_4(msg)
print("Victory!", end="\n")

@PipelineDecorator.pipeline(
name="bad_orchestrator_pipe",
project="Untitled",
version="1.0.0",
pipeline_execution_queue="pipeline_controllers",
)
def execute_bad_orchestrator(config: dict):
task = Task.current_task()
print("Pipeline task ID:", task.id)
print("Pipeline task status:", task.status)
print("Player name:", config["player_name"])
print("Identity:", config["player_identity"])
msg = f"{config['player_name'].title()} playing survival step game"
msg = msg + "\n" + "-" * len(msg)
msg = step_2(msg)
msg = step_1(msg)
msg = step_4(msg)
msg = step_3(msg)
print("Ha, you lost!", end="\n")

if name == "main":

PipelineDecorator.debug_pipeline(execute_steps_as_functions=False)

player_name = "Frank"
player_identity = "Renegade"

print(f"Executing pipeline for {player_name}")
config = dict()
config["player_name"] = player_name
config["player_identity"] = player_identity
execute_orchestrator(config)

# I also tried removing these two lines and it did not work.
pipeline_task = Task.current_task()
pipeline_task.mark_completed()

execute_bad_orchestrator(config) `I can't launch different pipelines in the same Python process either? Logic tells me when a pipeline execution is finished, the main task should be marked as completed and a new main task should be started, shouldn't it?
  
  
Posted 2 years ago

BTW, how can I run 'execute_orchestrator' concurrently? That is, launch it for several configurations at the same time? The way it's implemented now, it doesn't start the next configuration until the current one is finished.

  
  
Posted 2 years ago

I tried specifying helpers functions but it still gives the same error.

What's the error you are getting ?

  
  
Posted 2 years ago

Sure, converting pipelines into components also works for me (ignoring still having to fix the problem with LazyEvalWrapper return values). But this way some interesting features of the pipeline are missing, such as displaying the step execution DaG in the PLOTS tab .

  
  
Posted 2 years ago

GiganticTurtle0 we had this discussion in the wrong thread, I moved it here.
Moved from the wrong thread

Martin.B   [1:55 PM]
GiganticTurtle0  the sample mock pipeline seems to be running perfectly on the latest code from GitHub, can you verify ?

Martin.B   [1:55 PM]
Spoke too soon, sorry  🙂  issue is reproducible, give me a minute here

Alejandro C   [1:59 PM]
Oh, and which approach do you suggest to achieve the same goal (simultaneously running the same pipeline with different configurations using a single for loop)? (edited) 

Alejandro C   [2:00 PM]
Unless there is a straightforward way to support it...

Martin.B   [2:01 PM]
So why wouldn't you have:
` @PipelineDecorator.component(return_values=["msg"], execution_queue="services", helper_functions=[step_one, ...., step_four])
def execute_orchestrator(config: dict):
pass # stuff
return str(msg)

@PipelineDecorator.pipeline(...)
def main_pipeline():
PLAYERS_NAMES = ["Frank", "Alexander", "John"]
PLAYERS_IDENTITIES = ["Renegade", "Observer", "Lazy"]

for player_name, player_identity in zip(PLAYERS_NAMES, PLAYERS_IDENTITIES):
    print(f"Executing pipeline for {player_name}")
    config = dict()
    config["player_name"] = player_name
    config["player_identity"] = player_identity
    execute_orchestrator(config)
    print(f"Pipeline finished for {player_name}", end="\n\n") `(edited)

Alejandro C   [2:10 PM]
Mmm that is a very good alternative, this way I can leverage the newly-introduced nested components. However, I think it would be reasonable (plus natural) to concurrently run the same pipeline with different configurations. For example, suppose having a single agent that asynchronously orchestrate all of them (so that it is not necessary to spin up an agent for each instance of the executed pipeline). I would be happy to have that feature if it were not overly complicated to implement (I mean concurrent pipelines, I know the "asynchronous agent" is already available through the  --services-mode  CLI option) (edited) 

Martin.B   [2:14 PM]

 Mmm that is a very good alternative, this way I can leverage the newly-introduced nested components.

It actually worked! out of the box (almost you have to cast the "msg" return value), this is so cool!

 However, I think it would be reasonable (plus natural) to concurrently run the same pipeline with different configurations.

Pipeline is a Task, the  idea is you have one Task pipline with the decorator, then task pipeline triggering the decorator pipelines with diff arguments.
what do you think ?
(It might be solvable to have nested decorated pipelines, but it will make the pipeline nested inside the pipeline function, which I'm not sure looks pretty ... ) (edited) 

Alejandro C   [2:19 PM]
I see. And is it the same for the  PipelineController ? I mean, I can create several instances of  PipelineController , store them in a list and call the 'start' method for each instance in a for loop? Would that work? Or does it follow the same rules as  PipelineDecorator ?

Martin.B   [3:06 PM]
Hmm  PipelineController  follows the same logic (singleton), kind of like Task.init
What I'm thinking is something like this example:
https://github.com/allegroai/clearml/blob/0a543d55d0055c9499b8cefdf669135740de9ce6/examples/pipeline/pipeline_from_functions.py#L72
Where the function itself is a self contained pipeline decorator , wdyt ? is this clean enough ?

  
  
Posted 2 years ago

(ignoring still having to fix the problem with 

LazyEvalWrapper

 return values).

fix will be pushed post weekend 🙂

such as displaying the step execution DaG in the PLOTS tab . (edited)

Wait, what are you getting on the DAG plot ? I think we "should" be able to see all the steps

  
  
Posted 2 years ago

BTW, it looks like a lot of users really like the idea of runnig pipeline steps as subprocesses (which frankly I really cannot understand as Python Process is such an amazing tool to do just that),
anyhow We will have PipelineDecorator.debug_pipeline() which will run the pipeline steps as functions, and PipelineDecorator.execute_locally() which will run the Pipeline steps as subprocess
wdyt?

  
  
Posted 2 years ago

Hmm could it be this is on the "helper functions" ?

  
  
Posted 2 years ago

GiganticTurtle0 the fix was not applied in 1.1.2 (which was a hot fix after pyjwt interface changed and broke compatibility)
The type hint fix it on the latest RC:
pip install clearml==1.1.3rc0I just verified with your example
apologies for the confusion, we will release 1.1.3 soon (we just need to make sure all tests pass with a few PRs that were merged)

  
  
Posted 2 years ago

I'm getting a NameError because 'Optional' type hint is not defined in the global scope

  
  
Posted 2 years ago

GiganticTurtle0 this one worked for me 🙂
` from clearml import Task
from clearml.automation.controller import PipelineDecorator

@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue1")
def step_1(msg: str):
msg += "\nI've survived step 1!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue2")
def step_2(msg: str):
msg += "\nI've also survived step 2!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue3")
def step_3(msg: str):
msg += "\nI can't believe I survived step 3 too!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue4")
def step_4(msg: str):
msg += "\nAmazing, I survived the final step!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="services",
helper_functions=[step_1, step_2, step_3, step_4])
def execute_orchestrator(config: dict):
print("Player name:", config["player_name"])
print("Identity:", config["player_identity"])
msg = f"{config['player_name'].title()} playing survival step game"
msg = msg + "\n" + "-" * len(msg)
msg = step_1(msg)
msg = step_2(msg)
msg = step_3(msg)
msg = step_4(msg)
print(msg)
print("Victory!", end="\n")
return str(msg)

@PipelineDecorator.pipeline(
name="master_orchestrator_pipe_2",
project="debug",
version="1.0.0",
pipeline_execution_queue="pipeline_controllers",
)
def main_pipeline():
PLAYERS_NAMES = ["Frank", "Alexander", "John"]
PLAYERS_IDENTITIES = ["Renegade", "Observer", "Lazy"]
results = []
for player_name, player_identity in zip(PLAYERS_NAMES, PLAYERS_IDENTITIES):
print(f"Executing pipeline for {player_name}")
config = dict()
config["player_name"] = player_name
config["player_identity"] = player_identity
results.append(execute_orchestrator(config))
print(f"Pipeline finished for {player_name}")
print('Done Master PIPELINE\n\n')

if name == "main":

PipelineDecorator.debug_pipeline(execute_steps_as_functions=False)
main_pipeline() `
  
  
Posted 2 years ago

. However, despite having imported the required types from the 

typing

 library in the script where the function decorated with 

PipelineDecorator.component

 is defined, later in the generated script the 

typing

 library is not imported outside the scope of the function

Actually the typing part is not passed to the "created step" , because there are no global imports, for eexample:
def step(a: pd.DataFrame): import pandas as pd passThe imports are in the function step (we cannot move them outside without a very fragile code analysis) so the created function Task will not had pands imported on the global scope and you will end up with an exception "pd not defined".
Make sense?

  
  
Posted 2 years ago

BTW, how can I run 'execute_orchestrator' concurrently?

It is launching simultaneously, (i.e. if you are not processing the output of the pipeline step function, the execution will not wait for its completion, notice that the call itself might take a few seconds, as it create a task and enqueues/sub-process it, but is it Not waiting for it)

  
  
Posted 2 years ago

Looks great, let me see if I can understand what's missing, because it should have worked ...

  
  
Posted 2 years ago

I tried specifying helpers functions but it still gives the same error. If I define a component through the following code:
` from typing import Optional
from clearml.automation.controller import PipelineDecorator

@PipelineDecorator.component(...)
def step_data_loading(path: str, target_dir: Optional[str] = None):
pass Then in the automatically created script I find the following code: from clearml.automation.controller import PipelineDecorator

def step_data_loading(path: str, target_dir: Optional[str] = None):
pass

if name == 'main':
task = Task.init()
kwargs = {'path': None, 'target_dir': None}
task.connect(kwargs, name='kwargs')
function_input_artifacts = {}
params = task.get_parameters() or dict()
for k, v in params.items():
if not v or not k.startswith('kwargs_artifacts/'):
continue
k = k.replace('kwargs_artifacts/', '', 1)
task_id, artifact_name = v.split('.', 1)
kwargs[k] = Task.get_task(task_id=task_id).artifacts[artifact_name].get()
results = step_data_loading(**kwargs)
result_names = []
if result_names:
if not isinstance(results, (tuple, list)) or (len(result_names) == 1 and len(results) != 1):
results = [results]
for name, artifact in zip(result_names, results):
task.upload_artifact(name=name, artifact_object=artifact) `
Which shows that the type hints are retained

  
  
Posted 2 years ago

Hi AgitatedDove14 , great, glad it was fixed quickly!

By the way, before releasing version 1.1.3 you might want to take a look at this mock example. I'm trying to run the same pipeline (with different configurations) in a single for loop, as you can see below:
` from clearml import Task
from clearml.automation.controller import PipelineDecorator

@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue1")
def step_1(msg: str):
msg += "\nI've survived step 1!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue2")
def step_2(msg: str):
msg += "\nI've also survived step 2!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue3")
def step_3(msg: str):
msg += "\nI can't believe I survived step 3 too!"
return msg

@PipelineDecorator.component(return_values=["msg"], execution_queue="myqueue4")
def step_4(msg: str):
msg += "\nAmazing, I survived the final step!"
return msg

@PipelineDecorator.pipeline(
name="orchestrator_pipe",
project="Untitled",
version="1.0.0",
pipeline_execution_queue="pipeline_controllers",
)
def execute_orchestrator(config: dict):
print("Player name:", config["player_name"])
print("Identity:", config["player_identity"])
msg = f"{config['player_name'].title()} playing survival step game"
msg = msg + "\n" + "-" * len(msg)
msg = step_1(msg)
msg = step_2(msg)
msg = step_3(msg)
msg = step_4(msg)
print(msg)
print("Victory!", end="\n")

if name == "main":

PipelineDecorator.debug_pipeline()

PLAYERS_NAMES = ["Frank", "Alexander", "John"]
PLAYERS_IDENTITIES = ["Renegade", "Observer", "Lazy"]

for player_name, player_identity in zip(PLAYERS_NAMES, PLAYERS_IDENTITIES):
    print(f"Executing pipeline for {player_name}")
    config = dict()
    config["player_name"] = player_name
    config["player_identity"] = player_identity
    execute_orchestrator(config)
    print(f"Pipeline finished for {player_name}", end="\n\n") `

But I'm getting the following error:
Traceback (most recent call last): File "/home/user/myproject/concurrent_pipelines_lab.py", line 61, in <module> execute_orchestrator(config) File "/home/user/anaconda3/envs/myenv/lib/python3.9/site-packages/clearml/automation/controller.py", line 2213, in internal_decorator func(**pipeline_kwargs) File "/home/user/myproject/concurrent_pipelines_lab.py", line 40, in execute_orchestrator msg = step_1(msg) File "/home/user/anaconda3/envs/myenv/lib/python3.9/site-packages/clearml/automation/controller.py", line 2058, in wrapper _node = cls._singleton._nodes[_name] KeyError: 'step_1'Can you reproduce this example? Are the new style pipelines designed to work like this? Because it would be amazing if they could.
The reason I want to achieve this is to allow several pipelines to run simultaneously (to give a real world example, imagine I want to develop several models concurrently for different configurations)

  
  
Posted 2 years ago

Yes, I like it! I was already get used to the ' execute_steps_as_functions' argument of PipelineDecorator.debug_pipeline() but I find your proposal to be more intuitive.

  
  
Posted 2 years ago

Mmm that's weird. Because I can see the type hints in the function's arguments of the automatically generated script. So, maybe I'm doing something wrong or it's a bug, since they have been passed to the created step (I'm using clearml version 1.1.2 and clearml-agent version 1.1.0).

  
  
Posted 2 years ago
909 Views
18 Answers
2 years ago
one year ago
Tags