Unanswered
Hi, I Encounter A Weird Behavior: I Have A Task A That Schedules A Task B. Task B Is Executed On An Agent, But With An Old Commit
here is the function used to create the task:
` def schedule_task(parent_task: Task,
task_type: str = None,
entry_point: str = None,
force_requirements: List[str] = None,
queue_name="default",
working_dir: str = ".",
extra_params=None,
wait_for_status: bool = False,
raise_on_status: Iterable[Task.TaskStatusEnum] = (Task.TaskStatusEnum.failed, Task.TaskStatusEnum.stopped),
task_name: str = None,
reuse_if_exists: bool = False) -> Task:
"""
Schedule a Task to be executed remotely in a trains-agent.
Args:
parent_task (Task): Parent Task of the task to be scheduled.
task_type (str, optional): Type of the task to be scheduled. Defaults to None.
entry_point (str, optional): Entry point of the task to be scheduled. Defaults to None.
working_dir (str, optional): Working directory of the task to be scheduled. Defaults to None.
force_requirements (List[str], optional): List of requirements overwriting the ones from the parent task. Defaults to None.
queue_name (str, optional): Queue where to send the task to be scheduled. Defaults to "default".
extra_params (dict, optional): Extra parameters sent to the task using connect().
wait_for_status (bool, optional): Wait for task to finish. Defaults to False.
raise_on_status (Tuple[Task.TaskStatusEnum], optional): Task status to consider for raising an error. Defaults to (failed, stopped)
task_name (str, optional): Name of the task. Defaults to parent task name.
reuse_if_exists (bool, optional): Return the task if it already exists and is completed instead of running it.
Returns:
Task: The task successfully enqueued.
"""
if reuse_if_exists and task_name is not None:
task: Task = Task.get_task(project_name=parent_task.get_project_name(), task_name=task_name)
if task is not None and task.status == Task.TaskStatusEnum.completed:
return task
# TODO: Extend and use trains.automation.TrainsJob class
cloned_task = Task.clone(parent_task, name=task_name, parent=parent_task.task_id)
if task_type is not None:
cloned_task._edit(type=task_type)
script = cloned_task.data.script
if entry_point is not None:
script.entry_point = entry_point
cloned_task._update_script(script)
if working_dir is not None:
script.working_dir = working_dir
cloned_task._update_script(script)
if force_requirements is not None:
# Trains correctly guess environment, except from this package itself.
# Therefore requirements are overwritten.
# Explicitly setting torch version here will let trains pick the right wheel
# Based on trains-agent CUDA version.
cloned_task._wait_for_repo_detection()
cloned_task._update_requirements(force_requirements)
if extra_params is not None:
cloned_task.set_parameters(**extra_params)
Task.enqueue(cloned_task, queue_name=queue_name)
parent_task.get_logger().report_text(f"Successfully scheduled {task_type} Task (id={cloned_task.task_id})")
if wait_for_status:
cloned_task.wait_for_status(raise_on_status=raise_on_status)
return cloned_task `
162 Views
0
Answers
4 years ago
one year ago