Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi - Quick Question. I Am Using The Pipelinecontroller With Abort_On_Failure Set To False. I Have A Pipe With A First Task That Branch Out In 3 Branches.

Hi - quick question.
I am using the PipelineController with abort_on_failure set to False.
I have a pipe with a first task that branch out in 3 branches.

  • If one of the task in each branch fail - the remaining task in that branch are not schedule as expected.
  • However if one of the task in each branch is aborted (by the user for instance), the next task is that branch is still scheduled for execution. Is there a way to consider aborted task as failure from the point of view of the PipelineController ?
    Thxx!
  
  
Posted one year ago
Votes Newest

Answers 22


And same behavior if I make the dependance explicty via the retunr of the first one

Wait, are you saying that in the code above, when you abort "step_a" , then "step_b" is executed ?

  
  
Posted one year ago

So I can confirm I have the same behavior with this minomal example

#!/usr/bin/env python3
import fire
from typing import Optional
import time
from clearml import PipelineController
def step_one(a=1):
    print("Step 1")
    time.sleep(120)
    return True
def step_two(a=1):
    print("Step 2")
    time.sleep(120)
    return True
def launch():
    pipe = PipelineController(
        project="TEST",
        name="Pipeline demo",
        version="1.1",
        add_pipeline_tags=False,
    )
    pipe.set_default_execution_queue("cpu")
    pipe.add_function_step(
        name="step_one",
        function=step_one,
        function_kwargs={},
        cache_executed_step=False,
    )
    pipe.add_function_step(
        name="step_two",
        parents=["step_one"],
        function=step_two,
        cache_executed_step=False,
    )
    pipe.start()
    print("pipeline completed")
if __name__ == "__main__":
    fire.Fire(launch)
  
  
Posted one year ago

Okay let me check if we can reproduce, definitely not the way it is supposed to work 😞

  
  
Posted 12 months ago

Hey, finally got to try it, sorry about the delay.
However, I tried on 1.14.1 but i still get the same behavior

  
  
Posted 11 months ago

Thx working now on 1.14.2 🙌

  
  
Posted 10 months ago

Hi @<1523715429694967808:profile|ThickCrow29> ! We identified the issue. We will soon release a fix for it

  
  
Posted 11 months ago

Hi @<1523715429694967808:profile|ThickCrow29> ! What do you think of this behavior when using pipelines from decorators: suppose we have the following controller:

a = step_1() # step_1 gets aborted/failed
b = step_2(a)
c = step_3() 

In this case, if abort_on_failure is set to False, then step_2 will be skipped.
If the controller uses a , doing something like:

a = step_1() # step_1 gets aborted/failed
print(a)

then an exception will be thrown.
step_3 will run because it doesn't depend on a

In case of pipelines from functions/tasks, we will skip the children of aborted/failed tasks

Does this sound good to you?

  
  
Posted 11 months ago

Thx

  
  
Posted 11 months ago

Yep - sounds perfect 😉

  
  
Posted 11 months ago

Hi @<1523715429694967808:profile|ThickCrow29> , thank you for pinging!
We fixed the issue (hopefully) can you verify with the latest RC? 1.14.0rc0 ?

  
  
Posted 11 months ago

  • It’s a pipeline from Tasks.
  • clearml==1.13.2
  • For instance, in this pipeline, if the first task failed - then the remaining task are not schedule for execution which is what I expect. I am just surprised that if the first task is aborted instead by the user, the following task is still schedule for execution (and will fail cause it’s dependant on the first one to complete).
    image
  
  
Posted one year ago

Hi @<1523715429694967808:profile|ThickCrow29>

I am using the PipelineController with abort_on_failure set to False.

Is this a pipeline from code or from Tasks?
What is the clearml version?
Lastly, if a component fails, and another components is dependent on it's output, how would it run? if it is not dependent, why is it a child component?

  
  
Posted one year ago

Yes, I agree, it should be considered as failed and the PipelineController should not trigger the following task which depends on the first one. My problem is that it’s not the behavior I observe, the second task still get scheduled for execution. Is there a way to specify that to the PipelineController logic ?

  
  
Posted one year ago

great, thanks !

  
  
Posted 11 months ago

@<1523715429694967808:profile|ThickCrow29> this is odd... how did you create the pipeline? can you provide code sample?

  
  
Posted one year ago

Ok - good to know this is odd 🙂
It’s created like this (I remove some bits for readability)

def _run(pipeline_id, step):
    from pipeline_broker import pipeline

    pipeline.run_step(pipeline_id=pipeline_id, step=step)


def launch(
    cfg,
    queue: str = "default",
    abort_on_failure: bool = False,
    project: str = "TrainingPipeline",
    start_locally: bool = False,
    task_regex: str = ".*",
):
    ...

    pipe = PipelineController(
        project=project,
        name=project,
        version="1.0",
        add_pipeline_tags=True,
        abort_on_failure=abort_on_failure,
        pool_frequency=0.25,
        docker=remote_exec_docker_image,
    )
    task = Task.current_task()
    task.connect(cfg.to_dict(), name="main")
    pipe.set_default_execution_queue(queue)

    pipe.add_parameter(
        name="pipeline_id",
        description="ID of the pipeline",
        default=pipeline_id,
    )

    for step in cfg.steps:
        execution_queue = queue
        if cfg[step].get("clearml_execution_queue"):
            execution_queue = cfg[step]["clearml_execution_queue"]

        logging.info(f"Adding step: {step}, execution queue: {execution_queue}")
        task = cfg[step].get("task", None)
        parents = cfg.get_parents(step)
        pipe.add_function_step(
            name=step,
            task_name=task_name,
            project_name=task if task is not None else step,
            parents=parents,
            function=_run,
            function_kwargs=dict(pipeline_id="${pipeline.pipeline_id}", step=step),
            cache_executed_step=False,
            task_type=TaskTypes.custom,
            execution_queue=execution_queue,
            docker=remote_exec_docker_image,
        )
    if start_locally:
        pipe.start_locally()
        return
    pipe.start()

So it’s a pipeline from functions contrary to what I said ealier. I’ll try to reproduce with the example provided in your documentation.

  
  
Posted one year ago

And same behavior if I make the dependance explicty via the retunr of the first one

#!/usr/bin/env python3
import fire
from typing import Optional
import time
from clearml import PipelineController
def step_one(a=1):
    import time
    print("Step 1")
    time.sleep(120)
    return True
def step_two(a=1):
    import time
    print("Step 2")
    time.sleep(120)
    return True
def launch(
    tenant: str = "demo",
    loc_id: str = "common",
    tag: str = "test",
    pipeline_id: Optional[str] = None,
    queue: str = "default",
    skip_done: bool = False,
    abort_on_failure: bool = False,
    project: str = "CustomPipeline",
    start_locally: bool = False,
    task_regex: str = ".*",
    **kwargs,
):
    pipe = PipelineController(
        project="TEST",
        name="Pipeline demo",
        version="1.1",
        add_pipeline_tags=False,
    )
    pipe.set_default_execution_queue("cpu")
    pipe.add_function_step(
        name="step_one",
        function=step_one,
        function_kwargs={},
        function_return=["b"],
        cache_executed_step=False,
    )
    pipe.add_function_step(
        name="step_two",
        parents=["step_one"],
        function=step_two,
        function_kwargs=dict(a="${step_one.b}"),
        cache_executed_step=False,
    )
    pipe.start()
    print("pipeline completed")
if __name__ == "__main__":
    fire.Fire(launch)
  
  
Posted one year ago

Hi, any chance you got some time to look if you could replicate on your side ?

  
  
Posted 11 months ago

thx

  
  
Posted 12 months ago

if the first task failed - then the remaining task are not schedule for execution which is what I expect.

agreed

I'm just surprised that if the first task is

aborted

instead by the user,

How is that different from failed? The assumption is if a component depends on another one it needs its output, if it does not then they can run in parallel. What am i missing?

  
  
Posted one year ago

Step 1 was aborted, but the second still was scheduled
image

  
  
Posted one year ago

yes

  
  
Posted 12 months ago
843 Views
22 Answers
one year ago
10 months ago
Tags