The dark theme you have
It's this chrome extension ! I forget it's even on sometimes. It gives you a keyboard shortcut to toggle dark mode on any website. I love it.
Success! Wow, so this means I can use ClearML training/inference pipelines as part of AWS StepFunctions!
My plan is to have a AWS Step Functions state machine (DAG) that treats running a ClearML job as one step (task) in the DAG.
So I'd:
- Create an SQS queue with a lambda worker
- Place JSON events onto the queue, whose contents are parameters that the training/serving pipeline should take, and the ID of the pipeline. Something like
{"pipeline_id": "...", "aws_sfn_task_token": "....", "s3_paths": [...]}
. Thataws_sfn_task_token
can be sent byboto3
to signal to AWS whether the task has succeeded or failed.boto3
has API calls calledSendTaskSuccess
andSendTaskFailure
. - Trigger the lambda to read off a message, and use the ClearML SDK to remotely start the job, passing the the JSON event as one or more parameters.
- (Failure case): If the pipeline task fails, have the trigger react to that and use
boto3
to emit aSendTaskFailure
API call with theaws_sfn_task_token
. The State Machine could react to that and place the inputs onto a Retry queue. - (Success case): If the pipeline task succeeds, have the trigger react to that and use
boto3
to emit aSendTaskSuccess
call, with the appropriate celebratory handling of that 🎉
Note: The event on the queue could include a UUID that acts as a "Trace ID", too. That way, if I can figure out how to use theawslogs
driver for the docker containers run by the ClearML workers, then we can correlate all logs in the greater DAG (assuming all logs contain the Trace ID)
Hi @<1541954607595393024:profile|BattyCrocodile47>
Can you trigger a pre-existing Pipeline via the ClearML REST API?
Yes
'd want to have a Lambda function trigger the Pipeline for a batch without needing to have all the Pipeline code in the lambda function.
Easiest is to use clearml SDK, which basically is clone / enqueue (notice that pipeline is also a kind of a Task). See here: None
next_task = Task.get_task(<pipeline_id_here>)
cloned_task = Task.clone(source_task=next_task)
Task.enqueue(cloned_task.id, queue_name="services")
can ClearML emit an event that I can react to? ...
Easiest to do is if you have logic that triggers it, it would be to wait for the pipeline to finish with wait_for_status
, another option is to check the custom triggers: None
If (2) is not possible: is there a way to add an "try/except" component to a Pipeline? ...
Not sure I fully understand what you mean here...
To do this, I think I need to know:
- Can you trigger a pre-existing Pipeline via the ClearML REST API? I'd want to have a Lambda function trigger the Pipeline for a batch without needing to have all the Pipeline code in the lambda function. Something like
curl -u '<clearml credetials>'
None,...
- [probably a big ask] If the pipeline succeeds/fails, can ClearML emit an event that I can react to? Like maybe with a webhook or something? That way an event-driven system could place items to inference on a "retry queue" or a "success queue" and notify accordingly.
- If (2) is not possible: is there a way to add an "try/except" component to a Pipeline? (Something that gets run if any of the previous steps fail). That would be less ideal, because our pipelines would have to be "aware" of our AWS-based queue-ing system, but at least we could react to failing steps and use Python code in the ClearML pipeline to publish the inputs of the failed pipeline to a retry queue. I worry this method would be more flaky.
I took a stab at writing an automated trigger to handle this. The goal is: anytime a pipeline succeeds or fails, let AWS know so that the input records can be placed onto a retry queue (or not)
I'm trying to get a trigger to work in general, and then I'll add the more complex AWS logic. But I seem to be missing a step somewhere:
I wrote a file called set_triggers.py
from clearml.automation.trigger import TriggerScheduler
TRIGGER_SCHEDULER = TriggerScheduler()
from pprint import pprint
def log_status(task_id: str):
print("REACTING TO EVENT!")
pprint(task_id)
# write this message to a file at /opt/clearml/logs/trigger.log
with open("/opt/clearml/trigger.log", "a") as f:
f.write("REACTING TO EVENT!")
f.write(pprint(task_id))
TRIGGER_SCHEDULER.add_task_trigger(
name="emit_sfn_success_signal",
# trigger_name="emit_sfn_success_signal",
trigger_on_status=["created", "in_progress", "stopped", "closed", "failed", "completed", "queued", "published",
"publishing", "unknown"],
schedule_function=log_status,
schedule_queue="default",
)
And another called basic_task.py
from clearml import Task
TASK = Task.init(project_name="Trigger Project", task_name="Trigger Test")
print("I ran!")
When I run python set_triggers.py; python basic_task.py
, they seem to execute, but I don't see any evidence of the trigger having been executed. Is there any documentation I could read about this process? I was going off of the docstrings in the TriggerScheduler
class.
@<1541954607595393024:profile|BattyCrocodile47> first let me say I ❤ the dark theme you have going on there, we should definitly add that 🙂
When I run
python set_triggers.py; python basic_task.py
, they seem to execute, b
Seems like you forgot to start the trigger, i.e.
None
(this will cause the entire script of the trigger including logic to be launched on the services agent)
BTW: based on the diagram I would actually check the ClearML pipelines from decorators, you can just do try/except on each function/component and have full logic there, instead of external trigger logic, that you currently have.
My plan is to have a AWS Step Functions state machine (DAG) that treats running a ClearML job as one step (task) in the DAG.
...
Yep, that should work
That said, after you have that working, I would actually check pipelines + clearml aws autoscaler, easier setup, and possibly cheaper on the cloud (Lambda vs EC2 instance)
If this works, we might be able to fully replace Metaflow with ClearML!
Can't wait for your blog post on it 😉
If this works, we might be able to fully replace Metaflow with ClearML!
(Refering to the feature where Metaflow creates Step Functions state machines for you, and then you can use those to trigger event-driven batch jobs in the same way described here)
possibly cheaper on the cloud (Lambda vs EC2 instance)
Whoa, are you saying there's an autoscaler that doesn't use EC2 instances? I may be misunderstanding, but that would be very cool.
Maybe I should have said: my plan is to use AWS StepFunctions where a single task in the DAG is an entire ClearML pipeline . The non-ClearML steps would orchestrate putting messages into a queue, doing retry logic, and triggering said pipeline.
I think at some point, there has to be some amount of AWS-native architecture/infrastructure in order to react to objects landing in an S3 bucket in an event-driven way.
single task in the DAG is an entire ClearML
pipeline
.
just making sure detials are not lost, "entire ClearML pipeline ." : the pipeline logic is process A running on machine AA.
Every step of that pipeline can be (1) subprocess, but that means the exact same environement is used for everything, (2) The DEFAULT behavior, each step B is running on a different machine BB.
The non-ClearML steps would orchestrate putting messages into a queue, doing retry logic, and triggering said pipeline.
👍
Whoa, are you saying there's an autoscaler that
doesn't
use EC2 instances?...
Just to be clear the ClearML Autoscaler (aws) will spin instances up/down based on jobs in the queue it is listening to (the type of EC2 instances and configuration is fully configurable)
Man, I owe you lunch sometime @<1523701205467926528:profile|AgitatedDove14> . Thanks for being so detailed in your answers.
Okay! So the pipeline ID is really just a task ID. So cool!
Not sure I fully understand what you mean here...
Sorry, I'll try again. Here's an illustrated example with AWS Step Functions (pretend this is a ClearML pipeline). If the pipeline fails, I'd want to have a chance to do some logic to react to that. Maybe in a step called "on_pipeline_failed" or something.
That way, if the pipeline fails during any of the steps, I can still have the opportunity to execute some code to handle that error, for example, I could put any of the inputs to the Pipeline into a "retry queue".