Hi RoughTiger69 ,
Have you considered maybe cron jobs or using the task scheduler?
Another option is running a dedicated agent just for that - I'm guessing you can make it require very little compute power
RoughTiger69 I think you need the latest version (+1.3.0 with UI support)
If you are using an older version, you need to specify that you are continuing an execution (Change the "Configuration/Args/continue_pipeline" to True)
EDIT: clearml 1.3.x will work with clearml-server 1.2
Any recommended way to make a task/pipeline “pause” until some external condition is met?
RoughTiger69 I would setup a trigger on the Dataset (i.e. new version)
https://github.com/allegroai/clearml/blob/df3d3b269acd2df0f31bfe804eb54ddc84d807c0/examples/scheduler/trigger_example.py#L44
wdyt?
RoughTiger69 whats the clearml version you are using ?
btw: you are running it locally, then enqueuing and running it remotely via the agent ?
So if you are using the latest clearml (i.e. +1.3) reenqueuing the pipline will automatically continue it from where it stopped.
With previous versions (which is your case, I think), you clone the pipeline Task, change the parameter and enqueue it.
(The state itself of the pipeline is stored on the Task, and when you clone it, you are cloning the state as well).
Make sense ?
You're looking to avoid running an agent this entire time though, correct?
AgitatedDove14
What was important for me was that the user can define the entire workflow and that I can see its status as one ‘pipeline’ in the UI (vs. disparate tasks).
perform query process records into a labeling assignment Call labeling system API wait for and external hook when labels are ready clean the labels upload them to a dataset
Do you know what specific API do I need to signal “resume” after “abort”?
not “reset” I presume?
not sure I follow.
how can a cronjob solve this for me?
I want to manage the dataset creation task(s) in http://clear.ml .
This flow is triggered say manually whenever I want to create a train/test set for my model.
it just so happens that somewhere in this flow, the code needs to “wait” for days/weeks for the assignment to be ready.
AgitatedDove14 I tried your idea.
See code below.
Once the pipeline exists, I use the ui -> enqueue.
However it does seem to repeat the first task again when I (re) enqueue it.
Any ideas?
` from time import sleep
from clearml import PipelineDecorator, Task, TaskTypes
@PipelineDecorator.component(execution_queue='default', return_values=['message'], task_type=TaskTypes.data_processing)
def get_dateset_id():
message = "ccd8a65770e1407394cd3648246e4d25"
return message
@PipelineDecorator.component(execution_queue='default', return_values=['message2'], task_type=TaskTypes.data_processing)
def after(message):
message2 = message + "returned!!"
return message2
@PipelineDecorator.pipeline(name='try-aborting-and-restarting', project='classification-example', version='1.0', default_queue='default')
def logic():
message = get_dateset_id()
print(message)
from clearml import Dataset
ds = Dataset.get(dataset_id=message, dataset_tags='released')
if not ds or 'released' not in ds.tags:
print("aborting ourselves")
Task.current_task().mark_stopped()
# we will not get here, the agent will make sure we are stopped
sleep(60)
# better safe than sorry
exit(0)
message2 = after(message)
print(message2)
if name == 'main':
PipelineDecorator.run_locally()
logic() `
RoughTiger69 I think this could work, a pseudo example:
` @PipelineDecorator.component(...)
def the_last_step_before_external_stuff():
print("doing some stuff")
@PipelineDecorator.pipeline()
def logic():
the_last_step_before_external_stuff()
if not check_if_data_was_ingested_to_the_system:
print("aborting ourselves")
Task.current_task().abort()
# we will not get here, the agent will make sure we are stopped
sleep(60)
# better safe than sorry
exit(0) `wdyt? (the same logic can be implemented with pipeline from Tasks, and a callback function that essentially does the same)
When this pipeline is executed, it will not run "the_last_step_before_external_stuff()" again, and will just get to the if statement
AgitatedDove14 thanks, good idea.
My main issue with this approach is that it breaks the workflow into “a-sync” set of tasks:
One task sends a list of images for labeling and terminates an external webhook calls http://clear.ml and creates a dataset from the labels returned from the labeling task a trigger wakes up the label post processing/splitting logic.
It will be hard to understand where things are standing from looking at the UI.
I was wondering if the “waiting” operator can actually be a part of the pipeline.
This way it will look more clear what is the workflow we are executing.
because the wait for the external labeling is very long, I am not sure that long polling / sleeping inside the task is a good idea.
I was wondering if I could:
programmatically “abort” the pipeline. externally tell the pipeline to “resume” from the point it was aborted (e.g. specific task name).
WDYT?
AgitatedDove14 1.1.5.
Yes - first locally, then it aborts (while running locally presumably).
then I re-enqueue it via the UI and it seems to run on the agent
My main issue with this approach is that it breaks the workflow into “a-sync” set of tasks:
This is kind of the way you depicted it, meaning, there is an an initial dataset, "offline process" (i.e. external labeling) then, ingest process.
I was wondering if the “waiting” operator can actually be a part of the pipeline.
This way it will look more clear what is the workflow we are executing.
Hmm, so pipeline is "aborted", then the trigger relaunches the pipeline, and the pipeline continues from where it stopped?
I think this should work out of the box (needs testing, but supported). A pipeline can be aborted and continued, and the trigger will just launch the existing pipeline Task (the one aborted).
Should work 🤞 🙂
AgitatedDove14 I see the continue_pipeline
f flag.
I want to resume the same instance of the pipeline.
When I want to resume the pipeilne, I can only re-enqueue it - I cannot reset parameters (right?)
So it seems that for the pipeline to resume with the “continue pipeline” mode,
I need to pass the “continue_pipeline” first time I submit the pipeline.
Hopefully it will be ignored during the first run and just behave like a new run, and only really kick in when the pipeline is resumed.
Are these logical assumptions?
Thanks for all your help!