Thanks, yes I am familiar with all of the above.
We want to validate the entire pipeline . I am not talking about using a ClearML Pipeline as the validator (which is the case in your examples).
Here is some further detail that will hopefully make things more obvious:
- The pipeline is a series of steps which creates a feature store – in fact, you might even call it a feature pipeline!
- Each pipeline step takes responsibility for a different bit of feature engineering.
- We want to validate each of the feature engineering steps individually, and validate the pipeline as a whole. - At its simplest, this could just mean checking that all of the steps and the pipeline itself have completed successfully (by checking their “Task status”). - If everything is as it should be, we add a tag to this feature pipeline Task to say that it is suitable for use in production and can be cloned safely in the future.
- (When used in production, we would of course change the values of the input args of the pipeline before running, such that the feature pipeline takes in different data)
The validation itself need not even occur in a ClearML Setting at all – that is yet to be decided, and (for the purposes of this discussion) does not really matter! We may decide to set up the validation as a Task or a Pipeline, or we may decide to handle the validation without ClearML logging of any kind, but that is an entirely separate issue.
And validation must happen outside of the pipeline, because the clearml pipeline as a whole is the product that is being validated: the feature pipeline itself is what is being validated.
########################################################################
If we were doing this manually, we would:
- run
pipeline_script.py
which contains the pipeline code as decorators.- This would serialise the pipeline and upload it to the ClearML server, and run it on whatever test input we have defined – we have everything set up already to do this remotely, this is not an issue.- manually look on the ClearML Pipeline page and copy the Task ID of the pipeline run. - paste this Task ID into our validation script and run it. - The validation script would get the task (of the pipeline run) from the ClearML Server based on its Task ID, check the “Task status” of each of the pipeline steps and of the pipeline itself, and investigate each of the artifacts that were produced throughout the pipeline and its steps
- Dependent upon some criteria we may add a tag which enables us to identify which pipeline(s) on the ClearML Server has(/have) passed the tests – and again, the pipeline that is being tagged is the product and is not the validator .
However, of course looking at the ClearML pipelines page and deciding which pipeline ID corresponds to the pipeline you have just created withpipeline_script.py
is not a CI/CD solution. We therefore need a way of getting that pipeline (task) ID programatically.
########################################################################
Hopefully, this is enough context and explanation to show why your earlier pseudo-code is what we really need to do this.
I apologise for having said something after you posted that comment which confused the situation; that was not my intention!
pipeline_a_id = os.system("python3 create_pipeline_a.py")
This would create and run the pipeline remotely and return the Task ID.
We would then wait for the task with that ID to finish running.
We would then pass the Task ID into the validate function, and add a tag if it passed the tests.
# This code is just illustrative
pipeline_task = Task.get_task(task_id=pipeline_a_id)
pipeline_task.wait_for_status(raise_on_status=())
passed = validate_pipeline(pipeline_a_id)
if passed:
pipeline_task.add_tags("passed")
else:
pipeline_task.add_tags("failed")
Do notice this will only work for pipelines from Tasks, is this a good fit for you?
The issue with this is that we would then presumably have to run/“build” each of the Tasks (pipeline steps) separately to put them on the ClearML server and then get their Task ID’s in order to even write the code for the Pipeline, which increases the complexity of any automated CI/CD flow. Correct me if I’m wrong.
Essentially, I think the key thing here is we want to be able to build the entire Pipeline including any updates to existing pipeline steps and the addition of new steps without having to hard-code any Task ID’s and to be able to get the pipeline’s Task ID back at the end.
The functions and decorators pipelines are great because they handle the Task ID’s of the individual steps implicitly.
pipe = Pipeline(...)
pipe.add_step(...)
task_id = pipe.build()
print(f"pipeline task ID = {task_id}")
Having a pipe.build()
that creates a “draft” version of the pipeline on ClearML and returns the task id would be perfect, and it would be even more perfect if we could do this from functions or decorators, as individually “building” the steps of the Pipeline will make this much more complex. If it has to be done, it has to be done, but we would also need some way of doing the exact same thing (programmatically “build” and get the Task ID) with each of the Tasks in the Pipeline.
Thanks, I’ll check out those GitHub Actions examples but as you say, it’s the “template” step that is the key bit for this particular application.
the pipeline from tasks serializes itself to a configuration object that you can edit/create externally
I think if it has to come down to fiddling with lower-level objects, I’ll hold off for now and wait until something a bit more user-friendly comes along. Depends on how easy this is to work with.
This is something that we do need if we are going to keep using ClearML Pipelines, and we need it to be reliable and maintainable, so I don’t know whether it would be wise to cobble together a lower-level solution that has to be updated each time ClearML changes its serialisation code
- At its simplest, this could just mean checking that all of the steps and the pipeline itself have completed successfully (by checking their “Task status”).If a pipeline step ends with "failed" status in the pipeline execution function an exception will be raised, if the exception is not caught, the pipeline itself will also fail
run
pipeline_script.py
which contains the pipeline code as decorators.
So in theory the following should actually work.
Let's assume you have pipeline_script.py` with:
...
@PipelineDecorator.pipeline(pipeline_execution_queue=None, name='custom pipeline logic', project='examples', version='0.0.5')
def executing_pipeline(model_ids, mock_parameter='mock'):
accs = [test_model(i) for i in model_ids]
# the casting will wait for the model comparison to complete, before we just launched the components
accs = [int(c) for a in accs]
print("best model is", max(accs))
return Task.current_task()
def run_pipeline(model_ids):
PipelineDecorator.set_default_execution_queue('default')
return executing_pipeline(model_ids=model_ids)
Then from the git action you could do:
from pipeline_script import run_pipeline
task_id = run_pipeline(model_ids=['aa', 'bb', 'cc])
pipeline_task = Task.get_task(task_id)
# do some stuff here
Without getting into too much details this seems totally doable.
I suggest you play around with it, and if you feel something is missing, I would love to help solve / add new features to the pipeline. wdyt?
The Pipeline is defined using PipelineDecorators, so currently to “build and run” it would just involve running the script it is defined in (which enqueues it and runs it etc).
This is not ideal, as I need to access the Task ID and the only methods I can see are for use within the Task/Pipeline ( Task.current_task
and PipelineDecorator.get_current_pipeline
)
The reason I want to check completion etc outside the Pipeline Task is that I want to run data validation etc once when the pipeline is first built and run, but not every time the pipeline is cloned and ran thereafter
The pseudo-code you wrote previously is what would be required, I believe
be able to get the pipeline’s Task ID back at the end
This is the missing piece. We can’t perform validation without this, afaik
Basically, for a bit more context, this is part of an effort to incorporate ClearML Pipelines in a CI/CD framework.
@<1523701079223570432:profile|ReassuredOwl55> did you check these examples?
None
None
None
And I’d rather the testing/validation etc lived outside of the ClearML Pipeline itself,
Makes total sense to me.
The main point here is creating the pipeline "template", i.e. running it the first time from code. Actually you can do that manually, the pipeline from tasks serializes itself to a configuration object that you can edit/create externally, we can explore that (or maybe even create a dedicated interface
I "think" I have a clue on the issue that is lost here in the translation:
Specifically to me it all comes down to the definition of "pipeline"
From the clearml perspective:
Manual Task - code that is executed by the user (or any other mechanism Outside of the agent)
Remote Task - code that is executed by the Agent
Pipeline is a Task
Pipeline can be "manual task" but also "remote task"
Pipeline generates "remote tasks"
Task status (e.g. pipeline status as it is also a Task) can be: draft, aborted, completed, failed
Task can have multiple Tags. This means pipeline can also have multiple Tags.
Assume you have the following github action:
None
Image inside the compare_models.py
file you have the following code:
@PipelineDecorator.component(execution_queue="1xgpu", return_values=['accuracy'], cache=True, task_type=TaskTypes.data_processing)
def test_model(model_id: str):
print('compare model')
return 0.42
@PipelineDecorator.pipeline(pipeline_execution_queue=None, name='custom pipeline logic', project='examples', version='0.0.5')
def executing_pipeline(model_ids, mock_parameter='mock'):
accs = [test_model(i) for i in model_ids]
# the casting will wait for the model comparison to complete, before we just launched the components
accs = [int(c) for a in accs]
print("best model is", max(accs))
if max(accs) > 0.5:
Task.current_task().add_tag("passed")
PipelineDecorator.set_default_execution_queue('default')
executing_pipeline(model_ids=['aa', 'bb', 'cc])
This means every time the git action is triggered, a new pipeline is created (i.e. if nothing changes it will be the same pipeline version with a diff instance). This new pipeline (including the code itself) is logged in ClearML (which means that later you can also manually execute it). If the accuracy is above a threshold we mark the pipeline (i.e. tag it) as passed.
Notice the test_model
function will be executed on the "1xgpu" agents, not on the git action machine
With that in mind, what would you change to fit to your scenario ?
So would this pseudo code solve the issue
def pipeline_creator():
pipeline_a_id = os.system("python3 create_pipeline_a.py")
print(f"pipeline_a_id={pipeline_a_id}")
something like that?
(obviously the quesiton how would you get the return value of the new pipeline ID, but I'm getting a head of myself)
Does this require you run the pipeline locally (I see you have set default execution queue) or do any other specific set-up?
Yes this mean the pipeline Logic runs manually/locally (logic means launching components, not actually compute)
Please have a go at it, I'm sure some quirks in the psuedo code are missing but it should work, and I'll gladly help set it up
Hmm I see, if this is the case, would it make sense to run the pipeline logic locally? (notice the pipeline compute, i.e. the components will be running on remote machines with the agents)
Ah okay, that is a very easy solution to the problem. I wasn’t aware that you could build and run pipelines like that, and I especially wasn’t aware that you could return values from a pipeline and have them accessible to a script in the way that you have given.
Does this require you run the pipeline locally (I see you have set default execution queue) or do any other specific set-up?
I will give it a go tomorrow and report back – the only issue I foresee will be if doing this somehow includes all of the validation code that happens in “# do some stuff here” in the pipeline task as part of the pipeline controller’s code, as we wouldn’t want that to run validation every time the pipeline is cloned thereafter.
Thank you
The issue here is I don’t have the pipeline ID as it is a new version of the pipeline - i.e. the code has been updated, I want to run the updated pipeline (for the first time), get its ID, and then analyse the run/perform updates to tags (for example)
Essentially, I think the key thing here is we want to be able to build the entire Pipeline including any updates to existing pipeline steps and the addition of new steps without having to hard-code any Task ID’s and to be able to get the pipeline’s Task ID back at the end.
Oh if this is he case then basically you CI/CD code will be something like:
@PipelineDecorator.component(return_values=['data_frame'], cache=True, task_type=TaskTypes.data_processing)
def step_one(pickle_data_url: str, extra: int = 43):
print('step_one')
@PipelineDecorator.pipeline(pipeline_execution_queue=None, name='custom pipeline logic', project='examples', version='0.0.5')
def executing_pipeline(pickle_url, mock_parameter='mock'):
step_one(pickle_url)
PipelineDecorator.set_default_execution_queue('default')
executing_pipeline(pickle_url='
')
That's it. The components are executed remotely but the pipeline logic itself is executed on the github actions node (i.e. this is just logic, no actual compute / data)
Notice the key is the pipeline_execution_queue=None,
argument that basically tells the pipeline logic it should run locally (components are still running remotely)
wdyt?
Sorry, I don’t understand how this helps with validating the pipeline run.
Where would the validation code sit?
And the ClearML Pipeline run needs to be available on the ClearML Server (at least as a draft) so that it can be marked as in-production and cloned in the future
Hi @<1523701079223570432:profile|ReassuredOwl55>
I want to kick off the pipeline and then check completion
outside
of the pipeline task. (edited)
Basically the pipeline is a Task (of a certain type).
You do the "standard" thing, you clone the pipeline Task, you enqueue it, and you wait for it's status
task = Task.clone(source_task="<pipeline ID here>")
Task.enqueue(task, queue_name=services)
task.wait_for_status(...)
wdyt?
Sorry, I think something’s got lost in translation here, but thanks for the explanation.
Hopefully this is clearer:
- Say we have a new ClearML pipeline as code on a new commit in our repo.
- We want to build and run this new pipeline and have it available on the ClearML Server.
- We want to run a suite of tests that validate/verify/etc the performance of this entire ClearML Pipeline, e.g. by having it run on a set of predefined inputs and checking the various artifacts that were created – these checks will be made from outside the ClearML Pipeline itself, i.e. we require the Pipeline Run to have been stored on ClearML and we require a Task ID to get the info.
- If validation is passed, we want to mark the ClearML Pipeline run, e.g. via a tag, as having passed the tests, such that our automated systems know how to find the correct ClearML Pipeline to clone for future runs.
When you mention model verification and training, you are talking about those running within the pipeline. This is not the desired behaviour, as the Pipeline itself is the product and therefore the Pipeline itself needs, as a whole, to be validated/verified!
Yep, that’s it. Obviously would be nice to not have to go via the shell but that’s by the by (edit: I don’t know of a way to build or run a new version of a pipeline without going via the shell, so this isn’t a big deal).
Yep, would be happy to run locally, but want to automate this so does running locally help with getting the pipeline run ID (programmatically)?
This is something that we do need if we are going to keep using ClearML Pipelines, and we need it to be reliable and maintainable, so I don’t know whether it would be wise to cobble together a lower-level solution that has to be updated each time ClearML changes its serialisation code
Sorry if I was not clear, I do not mean for you ti do unstable low-level access, I meant that pipelines are Designed to be editable externally, they always deserialize themselves.
The only part that is missing is "export".
Do notice this will only work for pipelines from Tasks, is this a good fit for you?
BTW, what I'm thinking is:
pipe = Pipeline(...)
pipe.add_step(...)
task_id = pipe.build()
print(f"pipeline task ID = {task_id}")
wdyt?
Hi @<1523701079223570432:profile|ReassuredOwl55> let me try ti add some color here:
Basically we have to parts (1) pipeline logic, i.e. the code that drives the DAG, (2) pipeline components, e.g. model verification
The pipeline logic (1) i.e. the code that creates the dag, the tasks and enqueues them, will be running in the git actions context. i.e. this is the automation code. The pipeline components themselves (2) e.g. model verification training etc. are running using the clearml agents (via enqueuing the tasks) on other remote machines. The results of these components (i.e. querying them manually or automatically) is part of the pipeline logic (1)
Does that make sense ? If it does not, could add pseudo code of what you would like the git action to do, I will take this pseudo code and translate it 🙂
Basically, for a bit more context, this is part of an effort to incorporate ClearML Pipelines in a CI/CD framework. Changes to the pipeline script create_pipeline_a.py
that are pushed to a GitHub master
branch would trigger the build and testing of the pipeline.
And I’d rather the testing/validation etc lived outside of the ClearML Pipeline itself, as stated earlier – and that’s what your pseudo code allows, so if it’s possible that would be great. 🙂