In my use case I have a pipeline that executes inference tasks with several models simultaneously. Each inference task is actually a component that acts as a pipeline, since it executes the required steps to generate the predictions (dataset creation, preprocessing and prediction). For this, I'm using the new pipeline functionality ( PipelineDecorator
)
Sure! That definitely makes sense. Where can I specify callbacks in the PipelineDecorator
API?
The new parameter
abort_on_failed_steps
could be a list containing the name of the
I like that, we can also have it as an argument per step (i.e. the decorator can say, abort_pipeline_on_fail or continue_pipeline_processing)
I'm totally agree with the pipelinecontroller/decorator part. Regarding the proposal for the component parameter, I also think it would be a good feature, although it might mislead the fact that there will be times when the pipeline will fail because it is an intrinsically crucial step, so it doesn't matter whether 'continue_pipeline_on_failure' is set to True or False. Anyway, I can't think a better way to deal with that right now.
However, are you thinking of including this callbacks features in the new pipelines as well?
Can you see a good use case ? (I mean the infrastructure supports it, but sometimes too many arguments is just confusing, no?!)
So if any step corresponding to 'inference_orchestrator_1' fails, then 'inference_orchestrator_2' keeps running.
GiganticTurtle0 I'm not sure it makes sense to halt the entire pipeline if one step fails.
That said, how about using the post_execution callback, then check if the step failed, you could stop the entire pipeline (and any running steps), what do you think?
Well, I can see the difference here. Using the new pipelines generation the user has the flexibility to play with the returned values of each step. We can process those values before passing them to the next step, so maybe makes little sense to include those callbacks in this case
Or perhaps the complementary scenario with a continue_on_failed_steps
parameter which may be a list containing only the steps that can be ignored in case of failure.
Are you talking about consecutive pipeline steps? Or parallel?
Or maybe you could bundle some parameters that belongs to PipelineDecorator.component into high-level configuration variable (something like PipelineDecorator.global_config (?))
The scheme is similar to the following:main_pipeline (PipelineDecorator.pipeline) | |----------------------------------| | | inference_orchestrator_1 inference_orchestrator_2 (PipelineDecorator.component, (PipelineDecorator.component, acting as a pipeline) acting as a pipeline) | | | | data_preprocessing_1 data_preprocessing_2 (PipelineDecorator.component) (PipelineDecorator.component) | | | | inference_1 inference_2 (PipelineDecorator.component) (PipelineDecorator.component) | | | | output_predictions_1 output_predictions_2
So if any step corresponding to 'inference_orchestrator_1' fails, then 'inference_orchestrator_2' keeps running. Is there any way to interrupt the main pipeline in case of failure of any of the inference pipelines?
GiganticTurtle0
That definitely makes sense. Where can I specify callbacks in the
PipelineDecorator
API?
Hmm there isn't one actually... (the interface I was thinking about was PipelineConroller ...)
Would it make sense to throw an exception in the pipeline execution code?
BTW: I just verified, if the pipeline step fails an exception is raised (ValueError)
Okey, so I could signal to the main pipeline the exception raised in any of the pipeline components and it should halt the whole pipeline. However, are you thinking of including this callbacks features in the new pipelines as well?
GiganticTurtle0 My apologies, I made a mistake, this will not work 😞
In the example above "step_two" is executed "instantaneously" , meaning it is just launching the remote task, it is not actually waiting for it.
This means an exception will not be raised in the "correct" context (actually it will be raised in a background thread).
That means that I think we have to have a callback function, otherwise there is no actual way to catch the failed pipeline task.
Maybe the only real thing is just an argument for "abort_on_failed_step" ? WDYT?
Well, I see the same utility as it has in the first pipelines generation. After all, isn't the new decorator about keeping the same functionality but saving the user some boilerplate code?
Or maybe you could bundle some parameters that belongs to PipelineDecorator.component into high-level configuration variable (something like PipelineDecorator.global_config (?))
So in the PipelineController we have a per step callback and generic callbacks (i.e. for all the steps), is this what you are referring to ?
Well, I can see the difference here. Using the new pipelines generation the user has the flexibility to play with the returned values of each step.
Yep 🙂
We can process those values before passing them to the next step
Also correct 🙂
so maybe makes little sense to include those callbacks in this case
That was my thinking... why go with cumbersome callbacks, when you have the ability to write it as part of the execution logic
BTW: I haven't thought about the exception catching part, and I'll make sure we add PipelineDecorator.get_current_pipeline()
this way you could do:@PipelineDecorator.pipeline(name='custom pipeline logic', project='examples', version='0.0.5') def executing_pipeline(pickle_url, mock_parameter='mock'): print('launch step two') try: processed_data = step_two(data_frame) except Exception as e: # stop all running pipeline steps PipelineDecorator.get_current_pipeline().stop() raise
Okay so my thinking is, on the pipelinecontroller / decorator we will have:abort_all_running_steps_on_failure=False
(if True, on step failing it will abort all running steps and leave)
Then per step / component decorator we will havecontinue_pipeline_on_failure=False
(if True, on step failing, the rest of the pipeline dag will continue)
GiganticTurtle0 wdyt?
Hmm, I'm not 100% sure I follow. you have multiple models doing predictions. Is there a single data source that feeds to them and they run in parallel. or is one's output is another input and they run serially?
I think it could be a convenient approach. The new parameter abort_on_failed_steps
could be a list containing the name of the steps for which the pipeline will stop its execution if any of them fail (so that we can ignore other steps that are not crucial to continue the pipeline execution)