Unanswered
Hope Everyone'S Having A Nice Holiday Period.
I'Ve Been Debating Between Cron And The Clearml Taskscheduler Cron Is The Solution I'M Currently Using But I Wanted To Understand The Advantages To Using The Taskscheduler. Right Now I'M Using The Classic Cro
Yes, I'm experimenting with this. I actually wrote my own process to do this so I just had to adapt it as a callable to pass to the scheduler. However, I'm running into an issue and I don't think this is a user error this time. When I start the scheduler, it starts running, shows up in the web-app, but then an error message in the web-app pops up Fetch parents failed
and the Scheduler task disappears from the web-app. I can't even see an error log because the task is gone.
I'm running the following code:
@hydra.main(config_path="../configs", config_name="task_scheduler", version_base="1.3")
def main(cfg: DictConfig):
assert cfg.get("schedule"), "No tasks found to schedule"
# if there's a task scheduler already running we want to stop it and remove it (we don't want to run two schedulers)
running_schedulers: List[Task] = Task.get_tasks(
project_name="DevOps",
task_name="Scheduler",
task_filter={"status": ["in_progress"], "order_by": ["-last_update"]},
)
if running_schedulers:
for scheduler in running_schedulers:
scheduler.mark_completed()
# wait for 10 seconds before closing the scheduler
sleep(10)
scheduler.delete()
logger.info(f"Closed and removed running scheduler: {scheduler.id}")
# create a new scheduler
schedule: TaskScheduler = TaskScheduler(sync_frequency_minutes=5)
# create a scheduler function that will allow us to be more specific about the tasks we want to run
# (filter by tags, get the most recent task, etc.)
def scheduler_function(task: DictConfig):
"""Creates a callable that will return the correct task id to run at runtime."""
return lambda: prepare_task(
task_name=task.get("name"),
project_name=task.get("target_project"),
**task.get("schedule_function"),
)
###########################################
# Connect the tasks to the TaskScheduler. #
###########################################
for task in cfg.get("schedule"):
logger.info(
f"Adding task to scheduler: {task.get('name')} in project {task.get('target_project')}"
)
schedule.add_task(
schedule_function=scheduler_function(task),
**{k: v for k, v in task.items() if k != "schedule_function"},
)
schedule.start_remotely()
if __name__ == "__main__":
main()
For reference:
def prepare_task(
task_name: str,
project_name: str,
production: bool = False,
reuse_task: bool = False,
):
"""Run task was created to automate running tasks with ClearML. If we schedule a task
to run at a specific time, we run into an issue if the task is updated in the meantime.
This function allows the user to run this, which can select the most recent version of the task,
even filter by tags, override parameters, and run the task in a specific queue.
Args:
task_name (str): The name of the task to run.
project_name (str): The name of the project the task is in.
execution_queue_name (str): The name of the queue to run the task in.
production (bool, optional): Only use tasks with the "Production" tag. Defaults to False.
reuse_task (bool, optional): Whether or not to reuse the task (don't create a new task, reuse the task id). Defaults to False.
parameter_overrides (DictConfig, optional): A DictConfig or dict of parameters to override. Defaults to None.
Returns:
None"""
logger.add("logs/run_task.log", rotation="1 month", retention="1 year")
# get the most recent task
task_id = get_most_recent_task(
task_name=task_name,
project_name=project_name,
production=production,
)
assert task_id is not None, "No task found to run."
# grab the correct task
source_task: Task = Task.get_task(task_id=task_id)
if reuse_task:
source_task.reset()
# if reuse_task is True, pipe to the same task
target_task: Task = source_task
logger.info(f"Reusing: [{task_name}] task.")
else:
# Clone the task to pipe to. This creates a task with status Draft whose parameters can be modified.
target_task: Task = Task.clone(
source_task=source_task, comment="Scheduled Task."
)
# remove all tags from the task (in case we override something that changes the tags)
target_task.set_tags([])
return target_task.id
def get_most_recent_task(
task_name: str, project_name: str = None, production: bool = False
):
"""Gets the most recent task from ClearML based on the task name and project name."""
if production:
tags = ["Production"]
else:
tags = None
task_list = Task.get_tasks(
project_name=project_name,
task_name=task_name,
allow_archived=False,
tags=tags,
task_filter={"status": ["completed"], "order_by": ["-last_update"]},
)
if len(task_list) > 0:
task_id = task_list[0].id
logger.info(f"Found task [{task_name}] with ID {task_id}")
return task_id
else:
logger.error(
f"Task {task_name} not found in ClearML. The task might not be completed. Check the task status."
)
return None
100 Views
0
Answers
9 months ago
9 months ago