CostlyOstrich36 The commented section is the 3 layer part which I am not currently using due to the lag
` # def update_datasets(dataset_projects, parents, is_ppg=False):
current_steps = []
for dataset_project in dataset_projects:
dataset_project_split = dataset_project.split('-')
campaign_num = dataset_project_split[2]
parameters = {
'Args/XXX': args.XXX_template.replace('*', campaign_num),
'Args/XXX': args.XXX_template.replace('*', campaign_num),
'Args/dataset_project': dataset_project,
'Args/is_ppg': str(is_ppg),
'Args/XXX': args.XXX,
'Args/XXX': args.XXX,
}
step_name = f'update-{dataset_project}'
print(f'Creating step {step_name} with {parameters}')
current_steps.append(step_name)
pipeline.add_step(
name=step_name,
parents=parents,
execution_queue='services',
base_task_project=args.pipeline_templates_project,
base_task_name='update-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
cache_executed_step=True,
)
print(f'Step {step_name} created')
return current_steps
step1_parents = update_datasets(dataset_projects=args.ds_dataset_projects, parents=[])
print(f'Step 1 parents: {step1_parents}')
step2_parents = update_datasets(dataset_projects=args.ds_ppg_dataset_projects, parents=step1_parents, is_ppg=True)
print(f'Step 2 parents: {step2_parents}')
step1_parents = []
for dataset_project in args.ds_dataset_projects:
dataset_project_split = dataset_project.split('-')
campaign_num = dataset_project_split[2]
is_ppg = False
if len(dataset_project_split) == 4:
is_ppg = True
parameters = {
'Args/XXX': args.XXX_template.replace('*', campaign_num),
'Args/XXX': args.XXX_template.replace('*', campaign_num),
'Args/dataset_project': dataset_project,
'Args/is_ppg': str(is_ppg),
'Args/XXX': args.XXX,
'Args/XXX': args.XXX,
}
step_name = f'update-{dataset_project}'
print(f'Creating step {step_name} with {parameters}')
step1_parents.append(step_name)
pipeline.add_step(
name=step_name,
parents=None,
execution_queue='services',
base_task_project=args.pipeline_templates_project,
base_task_name='update-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
cache_executed_step=True,
)
print(f'Step {step_name} created')
print(f'Step 1 parents: {step1_parents}')
for queue in args.download_queues:
parameters = {
'Args/dataset_projects': args.ds_dataset_projects + args.open_dataset_projects,
'Args/dataset_dir': args.dataset_download_dir,
}
step_name = f'download-{queue}'
print(f'Creating step {step_name}')
pipeline.add_step(
name=step_name,
parents=step1_parents,
execution_queue=queue,
base_task_project=args.pipeline_templates_project,
base_task_name='download-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
cache_executed_step=True,
)
pipeline.start_locally()
pipeline.wait()
print('Pipeline finished') `