That's what I was getting at. It wasn't clear to me from the documentation that it saves the state.
Hi @<1545216070686609408:profile|EnthusiasticCow4>
My biggest concern is what happens if the TaskScheduler instance is shutdown.
good question, follow up, what happens to the cron service machine if it fails?!
TaskScheduler instance is shutdown.
And yes you are correct if someone stops the TaskScheduler instance
it is the equivalent of stopping the cron service...
btw: we are working on moving some of the cron/triggers capabilities to the backend , it will not be as flexible but will be a lot easier to maintain.
Also, nothing wrong with using actual cron service for it, the only real downside is visibility
@<1523701205467926528:profile|AgitatedDove14>
And the Task is still running? What's he clearml python version and webui version ?
No, the task stops (it's running remote, I haven't tested it running local).
In the debugger I can see that before starting the scheduler the test task is added:
ScheduleJob(name='Snitch-TaskScheduler', base_task_id='', base_function=<function main.<locals>.scheduler_function.<locals>.<lambda> at 0x7f05e1ab3600>, queue='services', target_project='DevOps', single_instance=False, task_parameters=None, task_overrides=None, clone_task=True, _executed_instances=None, execution_limit_hours=None, recurring=True, starting_time=datetime.datetime(2024, 1, 17, 10, 50, 28, 738971), minute=10, hour=None, day=None, weekdays=None, month=None, year=None, _next_run=None, _execution_timeout=None, _last_executed=None, _schedule_counter=0)
(apologies I just got to it now)
First of all, kudos on the video, this is so nice!!!
And thanks to you I think I found it:
None
we have to call serialize Before the execute_remotely
(the reason why sometimes it works is that it syncs in the background, so sometimes it's just fast enough and you get the config object)
Let me check if we can push an RC with a fix (I know there is already one queued, I'll check if we can have that included as well)
Alright, I fixed the issue with the scheduler eating itself. But now I'm still getting the same bug as two days ago. So the Scheduler process starts fine and doesn't "crash." But I don't get the config object in the web-app again. It seems to work if I run it locally.
To answer your earlier question, I'm using the app.clear.ml
portal so
- WebApp: 3.20.1-1525
- Server: 3.20.1-1299
- API: 2.28
- And my Python ClearML version: 1.14
Well, if I stop the cron service and start it back up I don't have to re-register each schedule. If, for instance, I start the TaskScheduler, register a task, and stop the scheduler, how do I restart the TaskScheduler in a way that re-register the tasks? Because, in theory, they could be registered from several users and I might be unaware of tasks that were previously scheduled. What is the best practices to preserve state?
Hi @<1523701205467926528:profile|AgitatedDove14> . I think I'm misunderstanding something here. I have the scheduler service running. Now that it's running how does one add a new task or remove an existing task from the scheduler? I get that I can add them before starting the scheduler service but once the service is running is there any way to connect to it and change the schedule?
I thought the advantage of this service would be we could schedule tasks just by connecting to the existing task scheduler but I'm not clear how I would do that from the documentation.
Maybe the sleep between scheduler.mark_completed()
and scheduler.delete()
is too short? But I don't get why deleting the old scheduler task would break the new scheduler. I'm going to try testing by running the scheduler locally.
but then an error message in the web-app pops up
Fetch parents failed
and the Scheduler task disappears
And the Task is still running? What's he clearml python version and webui version ?
Great, please feel free to share your thoughts here 🙂
Yes, I'm experimenting with this. I actually wrote my own process to do this so I just had to adapt it as a callable to pass to the scheduler. However, I'm running into an issue and I don't think this is a user error this time. When I start the scheduler, it starts running, shows up in the web-app, but then an error message in the web-app pops up Fetch parents failed
and the Scheduler task disappears from the web-app. I can't even see an error log because the task is gone.
I'm running the following code:
@hydra.main(config_path="../configs", config_name="task_scheduler", version_base="1.3")
def main(cfg: DictConfig):
assert cfg.get("schedule"), "No tasks found to schedule"
# if there's a task scheduler already running we want to stop it and remove it (we don't want to run two schedulers)
running_schedulers: List[Task] = Task.get_tasks(
project_name="DevOps",
task_name="Scheduler",
task_filter={"status": ["in_progress"], "order_by": ["-last_update"]},
)
if running_schedulers:
for scheduler in running_schedulers:
scheduler.mark_completed()
# wait for 10 seconds before closing the scheduler
sleep(10)
scheduler.delete()
logger.info(f"Closed and removed running scheduler: {scheduler.id}")
# create a new scheduler
schedule: TaskScheduler = TaskScheduler(sync_frequency_minutes=5)
# create a scheduler function that will allow us to be more specific about the tasks we want to run
# (filter by tags, get the most recent task, etc.)
def scheduler_function(task: DictConfig):
"""Creates a callable that will return the correct task id to run at runtime."""
return lambda: prepare_task(
task_name=task.get("name"),
project_name=task.get("target_project"),
**task.get("schedule_function"),
)
###########################################
# Connect the tasks to the TaskScheduler. #
###########################################
for task in cfg.get("schedule"):
logger.info(
f"Adding task to scheduler: {task.get('name')} in project {task.get('target_project')}"
)
schedule.add_task(
schedule_function=scheduler_function(task),
**{k: v for k, v in task.items() if k != "schedule_function"},
)
schedule.start_remotely()
if __name__ == "__main__":
main()
For reference:
def prepare_task(
task_name: str,
project_name: str,
production: bool = False,
reuse_task: bool = False,
):
"""Run task was created to automate running tasks with ClearML. If we schedule a task
to run at a specific time, we run into an issue if the task is updated in the meantime.
This function allows the user to run this, which can select the most recent version of the task,
even filter by tags, override parameters, and run the task in a specific queue.
Args:
task_name (str): The name of the task to run.
project_name (str): The name of the project the task is in.
execution_queue_name (str): The name of the queue to run the task in.
production (bool, optional): Only use tasks with the "Production" tag. Defaults to False.
reuse_task (bool, optional): Whether or not to reuse the task (don't create a new task, reuse the task id). Defaults to False.
parameter_overrides (DictConfig, optional): A DictConfig or dict of parameters to override. Defaults to None.
Returns:
None"""
logger.add("logs/run_task.log", rotation="1 month", retention="1 year")
# get the most recent task
task_id = get_most_recent_task(
task_name=task_name,
project_name=project_name,
production=production,
)
assert task_id is not None, "No task found to run."
# grab the correct task
source_task: Task = Task.get_task(task_id=task_id)
if reuse_task:
source_task.reset()
# if reuse_task is True, pipe to the same task
target_task: Task = source_task
logger.info(f"Reusing: [{task_name}] task.")
else:
# Clone the task to pipe to. This creates a task with status Draft whose parameters can be modified.
target_task: Task = Task.clone(
source_task=source_task, comment="Scheduled Task."
)
# remove all tags from the task (in case we override something that changes the tags)
target_task.set_tags([])
return target_task.id
def get_most_recent_task(
task_name: str, project_name: str = None, production: bool = False
):
"""Gets the most recent task from ClearML based on the task name and project name."""
if production:
tags = ["Production"]
else:
tags = None
task_list = Task.get_tasks(
project_name=project_name,
task_name=task_name,
allow_archived=False,
tags=tags,
task_filter={"status": ["completed"], "order_by": ["-last_update"]},
)
if len(task_list) > 0:
task_id = task_list[0].id
logger.info(f"Found task [{task_name}] with ID {task_id}")
return task_id
else:
logger.error(
f"Task {task_name} not found in ClearML. The task might not be completed. Check the task status."
)
return None
I start the TaskScheduler, register a task, and stop the scheduler, how do I restart the TaskScheduler in a way that re-register the tasks?
if it's aborted, just re-enqueue it?
(it serializes itself and stores it's state on the Task object, so when re-launched it will deserialize from the last state)
Strange, the code seems to work perfectly when I run it locally. To make it more confusing, the queue that I enqueue it to when I run it remotely is using agents from the same server that I'm running it locally from.
Thanks for checking @<1545216070686609408:profile|EnthusiasticCow4> stable release will be out soon
Hmm I guess we should better state that, I'll pass it on 🙂
It seems that the error is related to this part of the code block. However, when I comment this out I get the error I had 2 days ago with the missing configuration object.
Stability is for wimps. I live on the edge of brining down production at any moment, like a real developer. But thanks for the update! 🙃
✨ It works ✨
Thanks @<1523701205467926528:profile|AgitatedDove14> 😁
Oh, I get what's happening. That segment of the code is rerun when the task is enqueued remotely. So it's deleting itself. This also explains why it works fine locally. It's an ouroboros, the task is deleting itself.
@<1523701205467926528:profile|AgitatedDove14> Then it isn't working at intended. To test it I started the scheduler and set a simple dead man snitch process to run once a day. In the web-app (on your site app.cleearml.ml), when looking at the scheduler process in the DevOps section, I was able to see a configuration file under artifacts but it was not as all obvious how you'd change that because it wasn't part of the configuration section, it was just an artifact. So I thought maybe it was because the process was still running, so I aborted and reset the scheduler, as you suggested, but that just cleared the artifact and there was still nothing in the configuration object (see attached).
So I decided to rerun the code that created the scheduler in the first place.
import pyrootutils
root = pyrootutils.setup_root(
search_from=__file__,
indicator=[".git", "pyproject.toml"],
pythonpath=True,
dotenv=True,
)
from clearml.automation import TaskScheduler
from clearml import Task
from loguru import logger
from src.utils import get_config
@logger.catch
def main():
logger.add("logs/task_scheduler.log", rotation="1 month", retention="1 year")
schedule: TaskScheduler = TaskScheduler(sync_frequency_minutes=120)
# load the project config
cfg = get_config()
task_name = cfg["snitch_task_name"]
queue = cfg["queue"]
############################################
# Connect the snitch to the TaskScheduler. #
############################################
task_list = Task.get_tasks(
project_name="DevOps",
task_name=task_name,
allow_archived=False,
task_filter={"status": ["completed"], "order_by": ["-last_update"]},
)
if len(task_list) > 0:
task_id = task_list[0].id
logger.info(f"Found task {task_name} with ID {task_id}")
schedule.add_task(
name="Snitch-TaskScheduler",
schedule_task_id=task_id,
reuse_task=True,
queue=queue,
minute=00,
hour=7,
day=1,
)
else:
logger.error(
f"Task {task_name} not found in ClearML. Make sure to run the snitch.py script first."
)
return
schedule.start_remotely(queue=queue)
if __name__ == "__main__":
main()
The scheduler process starts, enqueues, and seems to work, but I don't see any config object or artifact this time (see attached). The console shows:
agent.cudnn_version = 0
sdk.storage.cache.default_base_dir = ~/.clearml/cache
sdk.storage.cache.size.min_free_bytes = 10GB
sdk.storage.direct_access.0.url = file://*
sdk.metrics.file_history_size = 100
sdk.metrics.matplotlib_untitled_history_size = 100
sdk.metrics.images.format = JPEG
sdk.metrics.images.quality = 87
sdk.metrics.images.subsampling = 0
sdk.metrics.tensorboard_single_series_per_graph = false
sdk.network.metrics.file_upload_threads = 4
sdk.network.metrics.file_upload_starvation_warning_sec = 120
sdk.network.iteration.max_retries_on_server_error = 5
sdk.network.iteration.retry_backoff_factor_sec = 10
sdk.network.file_upload_retries = 3
sdk.aws.s3.key =
sdk.aws.s3.region = eu-west-1
sdk.aws.boto3.pool_connections = 512
sdk.aws.boto3.max_multipart_concurrency = 16
sdk.log.null_log_propagate = false
sdk.log.task_log_buffer_capacity = 66
sdk.log.disable_urllib3_info = true
sdk.development.task_reuse_time_window_in_hours = 72.0
sdk.development.vcs_repo_detect_async = true
sdk.development.store_uncommitted_code_diff = true
sdk.development.support_stopping = true
sdk.development.default_output_uri =
sdk.development.force_analyze_entire_repo = false
sdk.development.suppress_update_message = false
sdk.development.detect_with_pip_freeze = false
sdk.development.worker.report_period_sec = 2
sdk.development.worker.ping_period_sec = 30
sdk.development.worker.log_stdout = true
sdk.development.worker.report_global_mem_used = false
sdk.development.worker.report_event_flush_threshold = 100
sdk.development.worker.console_cr_flush_period = 10
sdk.apply_environment = false
sdk.apply_files = false
api.version = 1.5
api.verify_certificate = true
api.default_version = 1.5
api.http.max_req_size = 15728640
api.http.retries.total = 240
api.http.retries.connect = 240
api.http.retries.read = 240
api.http.retries.redirect = 240
api.http.retries.status = 240
api.http.retries.backoff_factor = 1.0
api.http.retries.backoff_max = 120.0
api.http.wait_on_maintenance_forever = true
api.http.pool_maxsize = 512
api.http.pool_connections = 512
api.api_server =
api.web_server =
api.files_server =
api.credentials.access_key = S8T2YH1QWZCYNT1KNWP7
api.host =
environment.SOPS_AGE_KEY_FILE = ****
Executing task id [4586de32d5244b76bfd31cc810b2fb48]:
repository = git@github.com:TicketSwap/task-scheduler.git
branch = main
version_num = 7e7f40fe05e453b51207dfe0d735978fa9936634
tag =
docker_cmd =
entry_point = src/main.py
working_dir = .
::: Using Cached environment /home/natephysics/.clearml/venvs-cache/43c3da0e830954e1387980300f6708df.87471c9bd5c92dc7daad6e47efc48aee :::
Using cached repository in "/home/natephysics/.clearml/vcs-cache/task-scheduler.git.90fcaf8f8f73b3fd1d12fd948a2f9c52/task-scheduler.git"
Note: switching to '7e7f40fe05e453b51207dfe0d735978fa9936634'.
You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.
If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:
git switch -c <new-branch-name>
Or undo this operation with:
git switch -
Turn off this advice by setting config variable advice.detachedHead to false
HEAD is now at 7e7f40f feat: :tada: Initial Commit
type: git
url: git@github.com:TicketSwap/task-scheduler.git
branch: HEAD
commit: 7e7f40fe05e453b51207dfe0d735978fa9936634
root: /home/natephysics/.clearml/venvs-builds.8/3.11/task_repository/task-scheduler.git
Applying uncommitted changes
Executing: ('git', 'apply', '--unidiff-zero'): b'<stdin>:7: trailing whitespace.\nclearml==1.14 # MLops platform \nwarning: 1 line adds whitespace errors.\n'
Adding venv into cache: /home/natephysics/.clearml/venvs-builds.8/3.11
2024-01-15 10:51:43
Running task id [4586de32d5244b76bfd31cc810b2fb48]:
[.]$ /home/natephysics/.clearml/venvs-builds.8/3.11/bin/python -u src/main.py
Summary - installed python packages:
pip:
- attrs==23.2.0
- certifi==2023.11.17
- charset-normalizer==3.3.2
- clearml==1.14.0
- Cython==3.0.8
- furl==2.1.3
- idna==3.6
- jsonschema==4.20.0
- jsonschema-specifications==2023.12.1
- loguru==0.7.2
- numpy==1.26.3
- orderedmultidict==1.0.1
- pathlib2==2.3.7.post1
- pillow==10.2.0
- psutil==5.9.7
- PyJWT==2.8.0
- pyparsing==3.1.1
- pyrootutils==1.0.4
- python-dateutil==2.8.2
- python-dotenv==1.0.0
- PyYAML==6.0.1
- referencing==0.32.1
- requests==2.31.0
- rpds-py==0.17.1
- six==1.16.0
- urllib3==2.1.0
Environment setup completed successfully
Starting Task Execution:
ClearML results page:
2024-01-15 09:51:42.520 | INFO | __main__:main:40 - Found task Snitch-TaskScheduler with ID 383d86104e8e44a99bdf9aeabe8296a2
Syncing scheduler
Failed deserializing configuration: the JSON object must be str, bytes or bytearray, not NoneType
Syncing scheduler
2024-01-15 10:51:48
Failed deserializing configuration: the JSON object must be str, bytes or bytearray, not NoneType
Waiting for next run [UTC 2024-01-16 07:00:00], sleeping for 120.00 minutes, until next sync.
I'm not sure why it's even trying to deserialize something because I'm just starting a new scheduler. It worked when I used it before when I ran it locally so I assume it has something to do with the start_remote() but I also get it when I start it locally.
Thanks so much @<1523701205467926528:profile|AgitatedDove14> !
I made a video of the Scheduler config error. You can see that the same code run locally works and doesn't on remote. (I just uploaded the video so the quality might suffer until YT finishes processing the higher resolution versions).
Hi @<1545216070686609408:profile|EnthusiasticCow4>
Now that it's running how does one add a new task or remove an existing task from the scheduler?
Did you notice the scheduler stores its own configuration as a config object on the Task?
Notice that you can abort/reset the scheduler, change it's configuration in the UI and relaunch it (i.e. enqueue it). It will use the configuration from the UI (backend) and not the original code that created it. Does that make sense?
Thanks again for the info. I might experiment with it to see first hand what the advantages are.
Alright, I deleted everything in the ClearML web-app waited a day tried again, it seems to be showing a configuration object in the configuration section of the scheduler task again. I honestly don't know what changed. Maybe some strange caching on the server side that got cleaned up.
@<1523701205467926528:profile|AgitatedDove14> Question: Does the schedule_function
option in the TaskScheduler.add_task()
method run at the time the task is scheduled to execute? So if I pass a function that pulls the most recent version of a Task, it'll grab the most recent version every time it's scheduled?
So if I pass a function that pulls the most recent version of a Task, it'll grab the most recent version every time it's scheduled?
Basically you function will be called, that's it.
What I'm assuming is that you would want that function to find the latest Task (i.e. query based & filter based on project/name/tag etc), clone the selected Task and Enqueue it,
is that correct?
Hi @<1545216070686609408:profile|EnthusiasticCow4> let me know if this one solves the issue
pip install clearml==1.14.2rc0