Hello all,
I'm trying to queue a task in python but I'd like to reuse the prior task ID. In the webapp you can Reset
and Enqueue
a task and it'll reuse the task ID. I'm trying to do the same thing with the Python SDK (in the code example below, I'd like to reuse the task ID when reuse_task = True
.
Code:
def run_task(
task_name: str,
project_name: str,
execution_queue_name: str,
production: bool = False,
reuse_task: bool = False,
parameter_overrides: DictConfig = None,
):
# check that parameter_overrides is a DictConfig or dict
if parameter_overrides:
assert isinstance(
parameter_overrides, (DictConfig, dict)
), "parameter_overrides must be a DictConfig or dict."
# if parameter_overrides is a DictConfig convert it to a dict
if isinstance(parameter_overrides, DictConfig):
parameter_overrides = OmegaConf.to_container(
parameter_overrides, resolve=True
)
# get the most recent task
task_id = get_most_recent_task(
task_name=task_name, project_name=project_name, production=production
)
if task_id is None:
logger.error(f"Task {task_name} not found in ClearML.")
return
# grab the correct task
source_task: Task = Task.get_task(task_id=task_id)
if reuse_task:
source_task.reload()
# if reuse_task is True, pipe to the same task
target_task: Task = source_task
else:
# Clone the task to pipe to. This creates a task with status Draft whose parameters can be modified.
target_task: Task = Task.clone(
source_task=source_task, comment="Scheduled Task."
)
# if parameter_overrides is not None, update the parameters of the cloned task
if parameter_overrides:
# Get the original parameters of the Task, modify the value of one parameter,
# and set the parameters in the next Task
target_task_parameters: dict = target_task.get_parameters()
target_task_parameters.update(parameter_overrides)
target_task.set_parameters(target_task_parameters)
# remove all tags from the task
target_task.set_tags([])
# Enqueue the cloned task to the queue.
logger.info(f"Enqueueing [{task_name}] to: [{execution_queue_name}] queue.")
target_task.execute_remotely(queue_name=execution_queue_name)
I've tried using source_task.reload()
and source_task.reset()
. I've also tried using target_task.execute_remotely(queue_name=execution_queue_name)
and Task.enqueue(task=target_task.id, queue_name=execution_queue_name)
. I've also tried using force=true
in reset() and in enqueue. What am I missing? ðŸ«