Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi - Quick Question. I Am Using The Pipelinecontroller With Abort_On_Failure Set To False. I Have A Pipe With A First Task That Branch Out In 3 Branches.

Hi - quick question.
I am using the PipelineController with abort_on_failure set to False.
I have a pipe with a first task that branch out in 3 branches.

  • If one of the task in each branch fail - the remaining task in that branch are not schedule as expected.
  • However if one of the task in each branch is aborted (by the user for instance), the next task is that branch is still scheduled for execution. Is there a way to consider aborted task as failure from the point of view of the PipelineController ?
    Thxx!
  
  
Posted 2 months ago
Votes Newest

Answers 22


Hi @<1523715429694967808:profile|ThickCrow29> , thank you for pinging!
We fixed the issue (hopefully) can you verify with the latest RC? 1.14.0rc0 ?

  
  
Posted one month ago

Hi, any chance you got some time to look if you could replicate on your side ?

  
  
Posted one month ago

Hey, finally got to try it, sorry about the delay.
However, I tried on 1.14.1 but i still get the same behavior

  
  
Posted one month ago

And same behavior if I make the dependance explicty via the retunr of the first one

#!/usr/bin/env python3
import fire
from typing import Optional
import time
from clearml import PipelineController
def step_one(a=1):
    import time
    print("Step 1")
    time.sleep(120)
    return True
def step_two(a=1):
    import time
    print("Step 2")
    time.sleep(120)
    return True
def launch(
    tenant: str = "demo",
    loc_id: str = "common",
    tag: str = "test",
    pipeline_id: Optional[str] = None,
    queue: str = "default",
    skip_done: bool = False,
    abort_on_failure: bool = False,
    project: str = "CustomPipeline",
    start_locally: bool = False,
    task_regex: str = ".*",
    **kwargs,
):
    pipe = PipelineController(
        project="TEST",
        name="Pipeline demo",
        version="1.1",
        add_pipeline_tags=False,
    )
    pipe.set_default_execution_queue("cpu")
    pipe.add_function_step(
        name="step_one",
        function=step_one,
        function_kwargs={},
        function_return=["b"],
        cache_executed_step=False,
    )
    pipe.add_function_step(
        name="step_two",
        parents=["step_one"],
        function=step_two,
        function_kwargs=dict(a="${step_one.b}"),
        cache_executed_step=False,
    )
    pipe.start()
    print("pipeline completed")
if __name__ == "__main__":
    fire.Fire(launch)
  
  
Posted 2 months ago

Okay let me check if we can reproduce, definitely not the way it is supposed to work 😞

  
  
Posted 2 months ago

Ok - good to know this is odd 🙂
It’s created like this (I remove some bits for readability)

def _run(pipeline_id, step):
    from pipeline_broker import pipeline

    pipeline.run_step(pipeline_id=pipeline_id, step=step)


def launch(
    cfg,
    queue: str = "default",
    abort_on_failure: bool = False,
    project: str = "TrainingPipeline",
    start_locally: bool = False,
    task_regex: str = ".*",
):
    ...

    pipe = PipelineController(
        project=project,
        name=project,
        version="1.0",
        add_pipeline_tags=True,
        abort_on_failure=abort_on_failure,
        pool_frequency=0.25,
        docker=remote_exec_docker_image,
    )
    task = Task.current_task()
    task.connect(cfg.to_dict(), name="main")
    pipe.set_default_execution_queue(queue)

    pipe.add_parameter(
        name="pipeline_id",
        description="ID of the pipeline",
        default=pipeline_id,
    )

    for step in cfg.steps:
        execution_queue = queue
        if cfg[step].get("clearml_execution_queue"):
            execution_queue = cfg[step]["clearml_execution_queue"]

        logging.info(f"Adding step: {step}, execution queue: {execution_queue}")
        task = cfg[step].get("task", None)
        parents = cfg.get_parents(step)
        pipe.add_function_step(
            name=step,
            task_name=task_name,
            project_name=task if task is not None else step,
            parents=parents,
            function=_run,
            function_kwargs=dict(pipeline_id="${pipeline.pipeline_id}", step=step),
            cache_executed_step=False,
            task_type=TaskTypes.custom,
            execution_queue=execution_queue,
            docker=remote_exec_docker_image,
        )
    if start_locally:
        pipe.start_locally()
        return
    pipe.start()

So it’s a pipeline from functions contrary to what I said ealier. I’ll try to reproduce with the example provided in your documentation.

  
  
Posted 2 months ago

  • It’s a pipeline from Tasks.
  • clearml==1.13.2
  • For instance, in this pipeline, if the first task failed - then the remaining task are not schedule for execution which is what I expect. I am just surprised that if the first task is aborted instead by the user, the following task is still schedule for execution (and will fail cause it’s dependant on the first one to complete).
    image
  
  
Posted 2 months ago

if the first task failed - then the remaining task are not schedule for execution which is what I expect.

agreed

I'm just surprised that if the first task is

aborted

instead by the user,

How is that different from failed? The assumption is if a component depends on another one it needs its output, if it does not then they can run in parallel. What am i missing?

  
  
Posted 2 months ago

@<1523715429694967808:profile|ThickCrow29> this is odd... how did you create the pipeline? can you provide code sample?

  
  
Posted 2 months ago

yes

  
  
Posted 2 months ago

And same behavior if I make the dependance explicty via the retunr of the first one

Wait, are you saying that in the code above, when you abort "step_a" , then "step_b" is executed ?

  
  
Posted 2 months ago

So I can confirm I have the same behavior with this minomal example

#!/usr/bin/env python3
import fire
from typing import Optional
import time
from clearml import PipelineController
def step_one(a=1):
    print("Step 1")
    time.sleep(120)
    return True
def step_two(a=1):
    print("Step 2")
    time.sleep(120)
    return True
def launch():
    pipe = PipelineController(
        project="TEST",
        name="Pipeline demo",
        version="1.1",
        add_pipeline_tags=False,
    )
    pipe.set_default_execution_queue("cpu")
    pipe.add_function_step(
        name="step_one",
        function=step_one,
        function_kwargs={},
        cache_executed_step=False,
    )
    pipe.add_function_step(
        name="step_two",
        parents=["step_one"],
        function=step_two,
        cache_executed_step=False,
    )
    pipe.start()
    print("pipeline completed")
if __name__ == "__main__":
    fire.Fire(launch)
  
  
Posted 2 months ago

Hi @<1523715429694967808:profile|ThickCrow29>

I am using the PipelineController with abort_on_failure set to False.

Is this a pipeline from code or from Tasks?
What is the clearml version?
Lastly, if a component fails, and another components is dependent on it's output, how would it run? if it is not dependent, why is it a child component?

  
  
Posted 2 months ago

Yes, I agree, it should be considered as failed and the PipelineController should not trigger the following task which depends on the first one. My problem is that it’s not the behavior I observe, the second task still get scheduled for execution. Is there a way to specify that to the PipelineController logic ?

  
  
Posted 2 months ago

Step 1 was aborted, but the second still was scheduled
image

  
  
Posted 2 months ago

thx

  
  
Posted 2 months ago

Thx working now on 1.14.2 🙌

  
  
Posted 18 days ago

Hi @<1523715429694967808:profile|ThickCrow29> ! We identified the issue. We will soon release a fix for it

  
  
Posted one month ago

Hi @<1523715429694967808:profile|ThickCrow29> ! What do you think of this behavior when using pipelines from decorators: suppose we have the following controller:

a = step_1() # step_1 gets aborted/failed
b = step_2(a)
c = step_3() 

In this case, if abort_on_failure is set to False, then step_2 will be skipped.
If the controller uses a , doing something like:

a = step_1() # step_1 gets aborted/failed
print(a)

then an exception will be thrown.
step_3 will run because it doesn't depend on a

In case of pipelines from functions/tasks, we will skip the children of aborted/failed tasks

Does this sound good to you?

  
  
Posted one month ago

Yep - sounds perfect 😉

  
  
Posted one month ago

great, thanks !

  
  
Posted one month ago

Thx

  
  
Posted one month ago
184 Views
22 Answers
2 months ago
17 days ago
Tags