Hi @<1834401593374543872:profile|SmoggyLion3> ! There are a few things I can think of:
- If you need to continue a task that is marked as completed, you can do
clearml.Task.get_task(ID).mark_stopped(force=True)
to mark it as stopped. You can do this in the job that picks up the task and want to continue it before callingTask.init
, or in apost_execute_callback
in the pipeline itself, so the pipeline function marks itself as aborted. For example:
from clearml import PipelineDecorator
def callback(pipeline, node):
node.job.task.mark_stopped(force=True)
@PipelineDecorator.component(post_execute_callback=callback)
def step_1():
print("hello")
return 1
@PipelineDecorator.component()
def step_2(arg):
print(arg)
@PipelineDecorator.pipeline(name="example", project="example")
def pipe():
ret = step_1()
step_2(ret)
PipelineDecorator.run_locally()
pipe()
- The "submitting" task could actually either:
- create the task which we want to be continued with Task.create, enqueue the task and wait for it to be aborted
- create the task which we want to be continued in a subprocess. To do that, first you should remove from your env
CLEARML_PROC_MASTER_ID
andCLEARML_TASK_ID
. For example (pseudocode)
def do_work(should_continue=None):
if not should_continue:
Task.init("name", "project")
else:
Task.init(continue_last_task=should_continue)
# some work
if condition:
# abort - to be continued elsewhere
def pipe_step(should_continue):
if should_continue:
do_work(should_continue)
return
env = dict(**os.environ)
env.pop("CLEARML_PROC_MASTER_ID", None)
env.pop("CLEARML_TASK_ID", None)
Process(target=do_work, env=env)
args = argparse.parse_args # parse args with argparse, can fetch remote arguments without initializing the task
pipe_step(args.should_continue)