Hi UpsetBlackbird87 ,
If you're in the pipelines UI, you can switch to the detailed view and you can see each step of the pipeline as a node 🙂
You can see an example here:
https://clear.ml/docs/latest/docs/pipelines/pipelines
For some reason my code results in one node even though I think the logic above should result in a bipartite graph
Can you add the log printout of the controller?
Here is my log after task execution**Deleted**
Can you try running the pipeline locally using pipeline.start_locally()
https://clear.ml/docs/latest/docs/references/sdk/automation_controller_pipelinecontroller#start_locally-1
Also, try connecting a "starter" node and then make it parent of all the others at the start
CostlyOstrich36 This resulted in a bipartite graph that I expected but why? 😕
UpsetBlackbird87pipeline.start()
Will launch the pipeline itself On a remote machine (a machine running the services agent).
This is why your pipeline is "stuck" it is not actually running.
When you call start_lcoally() the pipeline logic itself is runnign on your machine and the nodes are running on the workers.
Makes sense ?
AgitatedDove14 Thx for the clear explanation!
AgitatedDove14 CostlyOstrich36 [FYI] I’m not sure if this is an optimization issue in ClearML or my computer’s issue but when I make the pipeline a tri-partite graph (add another parent layer) the DAG becomes weird (I think because of the lag) and the web app lags so much until I delete this pipeline in the web app a (I can’t interact will any nodes in the graph and switching between tasks takes a long time)
UpsetBlackbird87 , can you give me a code snippet with 3 layers to try and play with?
CostlyOstrich36 The commented section is the 3 layer part which I am not currently using due to the lag
` # def update_datasets(dataset_projects, parents, is_ppg=False):
current_steps = []
for dataset_project in dataset_projects:
dataset_project_split = dataset_project.split('-')
campaign_num = dataset_project_split[2]
parameters = {
'Args/XXX': args.XXX_template.replace('*', campaign_num),
'Args/XXX': args.XXX_template.replace('*', campaign_num),
'Args/dataset_project': dataset_project,
'Args/is_ppg': str(is_ppg),
'Args/XXX': args.XXX,
'Args/XXX': args.XXX,
}
step_name = f'update-{dataset_project}'
print(f'Creating step {step_name} with {parameters}')
current_steps.append(step_name)
pipeline.add_step(
name=step_name,
parents=parents,
execution_queue='services',
base_task_project=args.pipeline_templates_project,
base_task_name='update-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
cache_executed_step=True,
)
print(f'Step {step_name} created')
return current_steps
step1_parents = update_datasets(dataset_projects=args.ds_dataset_projects, parents=[])
print(f'Step 1 parents: {step1_parents}')
step2_parents = update_datasets(dataset_projects=args.ds_ppg_dataset_projects, parents=step1_parents, is_ppg=True)
print(f'Step 2 parents: {step2_parents}')
step1_parents = []
for dataset_project in args.ds_dataset_projects:
dataset_project_split = dataset_project.split('-')
campaign_num = dataset_project_split[2]
is_ppg = False
if len(dataset_project_split) == 4:
is_ppg = True
parameters = {
'Args/XXX': args.XXX_template.replace('*', campaign_num),
'Args/XXX': args.XXX_template.replace('*', campaign_num),
'Args/dataset_project': dataset_project,
'Args/is_ppg': str(is_ppg),
'Args/XXX': args.XXX,
'Args/XXX': args.XXX,
}
step_name = f'update-{dataset_project}'
print(f'Creating step {step_name} with {parameters}')
step1_parents.append(step_name)
pipeline.add_step(
name=step_name,
parents=None,
execution_queue='services',
base_task_project=args.pipeline_templates_project,
base_task_name='update-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
cache_executed_step=True,
)
print(f'Step {step_name} created')
print(f'Step 1 parents: {step1_parents}')
for queue in args.download_queues:
parameters = {
'Args/dataset_projects': args.ds_dataset_projects + args.open_dataset_projects,
'Args/dataset_dir': args.dataset_download_dir,
}
step_name = f'download-{queue}'
print(f'Creating step {step_name}')
pipeline.add_step(
name=step_name,
parents=step1_parents,
execution_queue=queue,
base_task_project=args.pipeline_templates_project,
base_task_name='download-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
cache_executed_step=True,
)
pipeline.start_locally()
pipeline.wait()
print('Pipeline finished') `
UpsetBlackbird87 , thanks! I'll play with it a bit to see if it reproduces on my side as well 🙂
UpsetBlackbird87 , I couldn't reproduce the issue on my end. Can you please send me a self contained example code to try and recreate?