I want to run only that sub-dag on all historical data in ad-hoc manner
But wouldn't that be covered by the caching mechanism ?
AgitatedDove14
Sort of.
I would go with something which is more like:
` execution_plan = {'step_b':'b_result', step_c: None, ...}
@PipelineDecorator.pipeline(...)
def pipeline(execution_plan):
step_results = {}
for step in pipeline.get_dag():
if step.name in execution_plan.keys():
step_results[step.name] = execution_plan[step.name] or step(**step_results)
`The ‘execution plan’ specifies list of steps to run (keys) and for each, whether we should use a user-specified value.
Then we iterate on the steps in the dag.
If the step is not at all in the execution plan, skip it entirely.
If the step is in the execution plan and has a user-provided value - use that value.
If it doesn’t have a user provided value - execute the step with all **kwargs it exects.
if decided to execute - fall back on clearml behaviour (including cache configuration)
I think the part that is missing for me is the context, in other words how would one configure the execution_plan
and why would they configure it in a specific way?
My intuition, without fully understanding it, is that for some reason the internal DAG/decision is exposed to the user, and it feels like too much information. Basically I have a hunch that the users should not need to have such deep understanding to control the flow, and they should end up with an abstraction on top of it. Does that make sense ?
Decorators are good 🙂
Something along the lines of
` @PipelineDecorator.pipeline(...)
def pipeline(skip_a=False):
if not skip_a:
a = step_a()
else:
# somehow get a previous A?
# let's call it cached A
a = "replace with real'
step_b(a)
... `Is this the gist?
If it is, this looks like, "how can I control whether A is cached or not", is that correct?
AgitatedDove14 decorators. but I would consider to convert it to whatever in order to achieve the above
Hi RoughTiger69
Is the pipeline in question based on decorators or is it based on existing Tasks?
It’s more like this:
I have a pipeline, ran on all data.
Now I change/add a sub-dag to the pipeline
I want to run only that sub-dag on all historical data in ad-hoc manner
And then next runs will run the full dag (e.g. only on new data)