Unanswered
Hi - Quick Question.
I Am Using The Pipelinecontroller With Abort_On_Failure Set To False.
I Have A Pipe With A First Task That Branch Out In 3 Branches.
Ok - good to know this is odd 🙂
It’s created like this (I remove some bits for readability)
def _run(pipeline_id, step):
from pipeline_broker import pipeline
pipeline.run_step(pipeline_id=pipeline_id, step=step)
def launch(
cfg,
queue: str = "default",
abort_on_failure: bool = False,
project: str = "TrainingPipeline",
start_locally: bool = False,
task_regex: str = ".*",
):
...
pipe = PipelineController(
project=project,
name=project,
version="1.0",
add_pipeline_tags=True,
abort_on_failure=abort_on_failure,
pool_frequency=0.25,
docker=remote_exec_docker_image,
)
task = Task.current_task()
task.connect(cfg.to_dict(), name="main")
pipe.set_default_execution_queue(queue)
pipe.add_parameter(
name="pipeline_id",
description="ID of the pipeline",
default=pipeline_id,
)
for step in cfg.steps:
execution_queue = queue
if cfg[step].get("clearml_execution_queue"):
execution_queue = cfg[step]["clearml_execution_queue"]
logging.info(f"Adding step: {step}, execution queue: {execution_queue}")
task = cfg[step].get("task", None)
parents = cfg.get_parents(step)
pipe.add_function_step(
name=step,
task_name=task_name,
project_name=task if task is not None else step,
parents=parents,
function=_run,
function_kwargs=dict(pipeline_id="${pipeline.pipeline_id}", step=step),
cache_executed_step=False,
task_type=TaskTypes.custom,
execution_queue=execution_queue,
docker=remote_exec_docker_image,
)
if start_locally:
pipe.start_locally()
return
pipe.start()
So it’s a pipeline from functions contrary to what I said ealier. I’ll try to reproduce with the example provided in your documentation.
107 Views
0
Answers
11 months ago
11 months ago