yup correct. but the scheduler not created idk why. here my code and the log
from doctest import Example
from clearml.automation import TriggerScheduler, TaskScheduler
from clearml import Task
import json
def open_json(fp):
with open(fp, 'r') as f:
my_dictionary = json.load(f)
return my_dictionary
def trigger_task_func(task_id):
print("trigger running...")
try:
previous_task = Task.get_task(task_id=task_id)
print(previous_task.artifacts)
try:
fp = previous_task.artifacts['latest_condition'].get_local_copy()
params = open_json(fp)
last_index = params.get('last_index')
day_n = params.get('iteration')
print("Success Fetching", params)
except Exception as e:
print("Failed Fetching", e)
last_index = 100
day_n = 10
print("Create Scheduler New")
scheduler = TaskScheduler()
scheduler.add_task(
target_project='Playground/Boy',
name=f"Map Scanning Iteration-{day_n}",
queue='cpu-nomad-preprod-py311',
schedule_task_id=task_id,
minute=10, # runt after 10 minutes
recurring=False,
execute_immediately=False,
task_parameters={
"last_index": last_index,
"iteration": day_n
}
)
print("Starting Scheduler...")
scheduler.start_remotely("services-py311")
print("okay running", scheduler.get_scheduled_tasks())
except Exception as e:
print(f"Error occurred: {str(e)}")
if __name__ == '__main__':
# create the TriggerScheduler object (checking system state every minute)
trigger = TriggerScheduler(
pooling_frequency_minutes=1.0,
)
# Add trigger on Task performance
trigger.add_task_trigger(
name='Trigger Scanning Checkpoint',
schedule_function=trigger_task_func,
trigger_project='Playground/xxx',
trigger_on_tags=['scanning-completed'],
trigger_on_status=['completed'],
schedule_queue="services-py311"
)
# start the trigger daemon (locally/remotely)
# trigger.start()
trigger.start_remotely(queue="services-py311")
Well, one solution could be to say that models can only be exported from main/master and then have devops start a new trigger on PR completion. That would require some logic for stopping the existing TriggerScheduler, but that shouldn't be too difficult.
However, the most flexible solution would be to have some way of triggering the execution of a script in the parent task environment, something along the lines of clearml-agent build ...
. I just can't wrap my head around triggering that kind of logic from a TriggerScheduler.
Also, how do pipelines compare here? Could I make an export pipeline that is triggered by a model publish/tag and then have that depend on the model git version?
Hi @<1523701205467926528:profile|AgitatedDove14> ,
Yes i want to do that, but so far i know Task.enqueue will execute immediately, i need execute task to spesific time, and i see to do that i need scheduler and set recurring False, set time.
I tried that create scheduler, but the scheduler not created when the function executed.
Hi @<1523701601770934272:profile|GiganticMole91>
Do you mean something like a git ops triggered by PR / tag etc ?
hi i have similar case, but can we scheduled new task here?
def trigger_task_func(task_id):
print("trigger running...")
try:
previous_task = Task.get_task(task_id=task_id)
print(previous_task.artifacts)
try:
fp = previous_task.artifacts['latest_condition'].get_local_copy()
params = open_json(fp)
last_index = params.get('last_index')
day_n = params.get('iteration')
print("Success Fetching", params)
except Exception as e:
print("Failed Fetching", e)
last_index = 100
day_n = 10
print("Create Scheduler New")
scheduler = TaskScheduler()
scheduler.add_task(
target_project='Playground/xxx',
name=f"xxx Iteration-{day_n}",
queue='cpu-xxx-preprod-xxx',
schedule_task_id=task_id,
hours=2,
recurring=False,
execute_immediately=False,
task_parameters={
"last_index": last_index,
"iteration": day_n
}
)
print("Starting Scheduler...")
scheduler.start_remotely("services-py311")
print("okay running", scheduler.get_scheduled_tasks())
except Exception as e:
print(f"Error occurred: {str(e)}")
Thanks!
@<1523701601770934272:profile|GiganticMole91> really nice!
but can we scheduled new task here?
@<1523701260895653888:profile|QuaintJellyfish58> do you mean schedule a Task from the scheduled function? if yes, you can do something similar to @<1523701601770934272:profile|GiganticMole91> , you create/clone existing Task, change arguments and push it into an execution queue. wdyt?
Oh I think that I understand what's going on, @<1523701260895653888:profile|QuaintJellyfish58> let me check how to update the cron scheduler while it is running (I really like this idea, so if this is not already supported I'l like us to add this capability 🙂 )
Just wanted to share a workaround for using a TriggerScheduler to execute a script using the latest commit of a given branch, without relying on cloning a Task. Don't know if it has been shown before in here 🙂
from clearml import Model, Task
from clearml.automation import TriggerScheduler
def trigger_model_func(model_id: str):
model = Model(model_id)
print(f"Triggered model export for model '{model.name}' ({model_id})")
# NOTE: To execute from the branch of
# task of the model uncomment the following lines:
# task: Task = Task.get_task(model.task)
# script_info = task.get_script()
# branch = script_info["branch"]
# repo = script_info["repository"]
repo = "git@ssh.dev.azure.com:v3/org/project/repo"
branch = "main"
subtask: Task = Task.create(
project_name="Model export",
task_name=f"Export of {model.name}",
task_type=Task.TaskTypes.service,
repo=repo,
branch=branch,
commit=None, # important to get the latest commit
add_task_init_call=True,
working_directory=".",
script="scripts/export_model.py",
argparse_args=[("model-id", model_id)],
)
Task.enqueue(subtask, "services")
if __name__ == "__main__":
trigger = TriggerScheduler(pooling_frequency_minutes=2)
# Add trigger on model export tag
trigger.add_model_trigger(
name="Model Export Trigger",
schedule_function=trigger_model_func,
trigger_on_tags=["export"],
)
# trigger.start()
trigger.start_remotely(queue="services")
Task.enqueue will execute immediately, i need execute task to spesific time
Oh I see what you mean, trigger -> scheduled (cron alike) -> Task executed.
Is that correct?
Also, how do pipelines compare here?
Pipelines are a type of Task, so like Tasks you can clone and enqueue them, or set them as the target of the trigger.
the most flexible solution would be to have some way of triggering the execution of a script in the parent task environment,
This is the exact idea of the TriggerScheduler None
What am I missing here?
Well, consider the case where you start the trigger scheduler on commit A, then you do some work that defines a new model and commit as commit B, train some model and now you want to export/deploy the model by publishing it and tagging it with some tag that triggers the export, as in your example. The scheduler will then fail, because the model is not implemented at commit A.
Anyways, I think I've solved it, I'll post the workaround when I get around to it 🙂
You can create a task in the trigger_func and enqueue it, and only specify which branch you want to use. Then I'll get a scheduler that is independent from the experiment code and export functionality that follows the code as it develops.
Thanks @<1523701205467926528:profile|AgitatedDove14> , right now i just use trigger to send notification and do it manually. ClearML Superb!