Step 1 was aborted, but the second still was scheduled
And same behavior if I make the dependance explicty via the retunr of the first one
Wait, are you saying that in the code above, when you abort "step_a" , then "step_b" is executed ?
Hi @<1523715429694967808:profile|ThickCrow29> , thank you for pinging!
We fixed the issue (hopefully) can you verify with the latest RC? 1.14.0rc0 ?
Hi @<1523715429694967808:profile|ThickCrow29> ! We identified the issue. We will soon release a fix for it
Hi, any chance you got some time to look if you could replicate on your side ?
- It’s a pipeline from Tasks.
- clearml==1.13.2
- For instance, in this pipeline, if the first task failed - then the remaining task are not schedule for execution which is what I expect. I am just surprised that if the first task is aborted instead by the user, the following task is still schedule for execution (and will fail cause it’s dependant on the first one to complete).
Ok - good to know this is odd 🙂
It’s created like this (I remove some bits for readability)
def _run(pipeline_id, step):
from pipeline_broker import pipeline
pipeline.run_step(pipeline_id=pipeline_id, step=step)
def launch(
cfg,
queue: str = "default",
abort_on_failure: bool = False,
project: str = "TrainingPipeline",
start_locally: bool = False,
task_regex: str = ".*",
):
...
pipe = PipelineController(
project=project,
name=project,
version="1.0",
add_pipeline_tags=True,
abort_on_failure=abort_on_failure,
pool_frequency=0.25,
docker=remote_exec_docker_image,
)
task = Task.current_task()
task.connect(cfg.to_dict(), name="main")
pipe.set_default_execution_queue(queue)
pipe.add_parameter(
name="pipeline_id",
description="ID of the pipeline",
default=pipeline_id,
)
for step in cfg.steps:
execution_queue = queue
if cfg[step].get("clearml_execution_queue"):
execution_queue = cfg[step]["clearml_execution_queue"]
logging.info(f"Adding step: {step}, execution queue: {execution_queue}")
task = cfg[step].get("task", None)
parents = cfg.get_parents(step)
pipe.add_function_step(
name=step,
task_name=task_name,
project_name=task if task is not None else step,
parents=parents,
function=_run,
function_kwargs=dict(pipeline_id="${pipeline.pipeline_id}", step=step),
cache_executed_step=False,
task_type=TaskTypes.custom,
execution_queue=execution_queue,
docker=remote_exec_docker_image,
)
if start_locally:
pipe.start_locally()
return
pipe.start()
So it’s a pipeline from functions contrary to what I said ealier. I’ll try to reproduce with the example provided in your documentation.
@<1523715429694967808:profile|ThickCrow29> this is odd... how did you create the pipeline? can you provide code sample?
if the first task failed - then the remaining task are not schedule for execution which is what I expect.
agreed
I'm just surprised that if the first task is
aborted
instead by the user,
How is that different from failed? The assumption is if a component depends on another one it needs its output, if it does not then they can run in parallel. What am i missing?
Yes, I agree, it should be considered as failed and the PipelineController should not trigger the following task which depends on the first one. My problem is that it’s not the behavior I observe, the second task still get scheduled for execution. Is there a way to specify that to the PipelineController logic ?
Hi @<1523715429694967808:profile|ThickCrow29>
I am using the PipelineController with abort_on_failure set to False.
Is this a pipeline from code or from Tasks?
What is the clearml version?
Lastly, if a component fails, and another components is dependent on it's output, how would it run? if it is not dependent, why is it a child component?
Hi @<1523715429694967808:profile|ThickCrow29> ! What do you think of this behavior when using pipelines from decorators: suppose we have the following controller:
a = step_1() # step_1 gets aborted/failed
b = step_2(a)
c = step_3()
In this case, if abort_on_failure is set to False, then step_2
will be skipped.
If the controller uses a
, doing something like:
a = step_1() # step_1 gets aborted/failed
print(a)
then an exception will be thrown.step_3
will run because it doesn't depend on a
In case of pipelines from functions/tasks, we will skip the children of aborted/failed tasks
Does this sound good to you?
Hey, finally got to try it, sorry about the delay.
However, I tried on 1.14.1 but i still get the same behavior
Okay let me check if we can reproduce, definitely not the way it is supposed to work 😞
So I can confirm I have the same behavior with this minomal example
#!/usr/bin/env python3
import fire
from typing import Optional
import time
from clearml import PipelineController
def step_one(a=1):
print("Step 1")
time.sleep(120)
return True
def step_two(a=1):
print("Step 2")
time.sleep(120)
return True
def launch():
pipe = PipelineController(
project="TEST",
name="Pipeline demo",
version="1.1",
add_pipeline_tags=False,
)
pipe.set_default_execution_queue("cpu")
pipe.add_function_step(
name="step_one",
function=step_one,
function_kwargs={},
cache_executed_step=False,
)
pipe.add_function_step(
name="step_two",
parents=["step_one"],
function=step_two,
cache_executed_step=False,
)
pipe.start()
print("pipeline completed")
if __name__ == "__main__":
fire.Fire(launch)
And same behavior if I make the dependance explicty via the retunr of the first one
#!/usr/bin/env python3
import fire
from typing import Optional
import time
from clearml import PipelineController
def step_one(a=1):
import time
print("Step 1")
time.sleep(120)
return True
def step_two(a=1):
import time
print("Step 2")
time.sleep(120)
return True
def launch(
tenant: str = "demo",
loc_id: str = "common",
tag: str = "test",
pipeline_id: Optional[str] = None,
queue: str = "default",
skip_done: bool = False,
abort_on_failure: bool = False,
project: str = "CustomPipeline",
start_locally: bool = False,
task_regex: str = ".*",
**kwargs,
):
pipe = PipelineController(
project="TEST",
name="Pipeline demo",
version="1.1",
add_pipeline_tags=False,
)
pipe.set_default_execution_queue("cpu")
pipe.add_function_step(
name="step_one",
function=step_one,
function_kwargs={},
function_return=["b"],
cache_executed_step=False,
)
pipe.add_function_step(
name="step_two",
parents=["step_one"],
function=step_two,
function_kwargs=dict(a="${step_one.b}"),
cache_executed_step=False,
)
pipe.start()
print("pipeline completed")
if __name__ == "__main__":
fire.Fire(launch)