Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi I’M Trying Out Pipeline Controller From Tasks. I Was Not Able To Understand Why My Code Results In Just One Task(The First One) In The Pipeline.

Hi

I’m trying out pipeline controller from tasks.
I was not able to understand why my code results in just one task(the first one) in the pipeline.
` pipeline = PipelineController(
pool_frequency=0.1,
name='dataset-pipeline',
project=args.pipeline_project,
version='1',
auto_version_bump=True,
add_pipeline_tags=True,
abort_on_failure=True,
add_run_number=True,
)
pipeline.set_default_execution_queue('services')

step1_parents = []
for dataset_project in args.dataset_projects:
dataset_project_split = dataset_project.split('-')
campaign_num = dataset_project_split[2]

is_ppg = False
if len(dataset_project_split) == 4:
    is_ppg = True

parameters = {
    'Args/is_ppg': str(is_ppg),
}

step_name = f'update-{dataset_project}'
timer.print(f'Creating step {step_name} with {parameters}')
step1_parents.append(step_name)
pipeline.add_step(
    name=step_name,
    parents=None,
    execution_queue='services',
    base_task_project=args.pipeline_templates_project,
    base_task_name='update-dataset',
    parameter_override=parameters,
    task_overrides={'script.branch': args.git_branch},
    continue_on_fail=False,
)
timer.print(f'Step {step_name} created')

timer.print(f'Step 1 parents: {step1_parents}')

for queue in args.download_queues:
parameters = {
'Args/dataset_project': args.dataset_projects + args.open_dataset_projects,
}
step_name = f'download-{queue}'
timer.print(f'Creating step {step_name}')
pipeline.add_step(
name=step_name,
parents=step1_parents,
execution_queue=queue,
base_task_project=args.pipeline_templates_project,
base_task_name='download-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
)
timer.print(f'Step {step_name} created')

pipeline.start()
pipeline.wait()
timer.print('Pipeline finished') `

  
  
Posted one year ago
Votes Newest

Answers 13


For some reason my code results in one node even though I think the logic above should result in a bipartite graph

  
  
Posted one year ago

AgitatedDove14 CostlyOstrich36 [FYI] I’m not sure if this is an optimization issue in ClearML or my computer’s issue but when I make the pipeline a tri-partite graph (add another parent layer) the DAG becomes weird (I think because of the lag) and the web app lags so much until I delete this pipeline in the web app a (I can’t interact will any nodes in the graph and switching between tasks takes a long time)

  
  
Posted one year ago

CostlyOstrich36 This resulted in a bipartite graph that I expected but why? 😕

  
  
Posted one year ago

UpsetBlackbird87 , I couldn't reproduce the issue on my end. Can you please send me a self contained example code to try and recreate?

  
  
Posted one year ago

UpsetBlackbird87 , can you give me a code snippet with 3 layers to try and play with?

  
  
Posted one year ago

AgitatedDove14 Thx for the clear explanation!

  
  
Posted one year ago

Can you add the log printout of the controller?

  
  
Posted one year ago

Can you try running the pipeline locally using pipeline.start_locally()
https://clear.ml/docs/latest/docs/references/sdk/automation_controller_pipelinecontroller#start_locally-1
Also, try connecting a "starter" node and then make it parent of all the others at the start

  
  
Posted one year ago

Here is my log after task execution
**Deleted**

  
  
Posted one year ago

Hi UpsetBlackbird87 ,
If you're in the pipelines UI, you can switch to the detailed view and you can see each step of the pipeline as a node 🙂
You can see an example here:
https://clear.ml/docs/latest/docs/pipelines/pipelines

  
  
Posted one year ago

CostlyOstrich36 The commented section is the 3 layer part which I am not currently using due to the lag
` # def update_datasets(dataset_projects, parents, is_ppg=False):

current_steps = []

for dataset_project in dataset_projects:

dataset_project_split = dataset_project.split('-')

campaign_num = dataset_project_split[2]

parameters = {

'Args/XXX': args.XXX_template.replace('*', campaign_num),

'Args/XXX': args.XXX_template.replace('*', campaign_num),

'Args/dataset_project': dataset_project,

'Args/is_ppg': str(is_ppg),

'Args/XXX': args.XXX,

'Args/XXX': args.XXX,

}

step_name = f'update-{dataset_project}'

print(f'Creating step {step_name} with {parameters}')

current_steps.append(step_name)

pipeline.add_step(

name=step_name,

parents=parents,

execution_queue='services',

base_task_project=args.pipeline_templates_project,

base_task_name='update-dataset',

parameter_override=parameters,

task_overrides={'script.branch': args.git_branch},

continue_on_fail=False,

cache_executed_step=True,

)

print(f'Step {step_name} created')

return current_steps

step1_parents = update_datasets(dataset_projects=args.ds_dataset_projects, parents=[])

print(f'Step 1 parents: {step1_parents}')

step2_parents = update_datasets(dataset_projects=args.ds_ppg_dataset_projects, parents=step1_parents, is_ppg=True)

print(f'Step 2 parents: {step2_parents}')

step1_parents = []
for dataset_project in args.ds_dataset_projects:
dataset_project_split = dataset_project.split('-')
campaign_num = dataset_project_split[2]

is_ppg = False
if len(dataset_project_split) == 4:
    is_ppg = True

parameters = {
    'Args/XXX': args.XXX_template.replace('*', campaign_num),
    'Args/XXX': args.XXX_template.replace('*', campaign_num),
    'Args/dataset_project': dataset_project,
    'Args/is_ppg': str(is_ppg),
    'Args/XXX': args.XXX,
    'Args/XXX': args.XXX,
}

step_name = f'update-{dataset_project}'
print(f'Creating step {step_name} with {parameters}')
step1_parents.append(step_name)
pipeline.add_step(
    name=step_name,
    parents=None,
    execution_queue='services',
    base_task_project=args.pipeline_templates_project,
    base_task_name='update-dataset',
    parameter_override=parameters,
    task_overrides={'script.branch': args.git_branch},
    continue_on_fail=False,
    cache_executed_step=True,
)
print(f'Step {step_name} created')

print(f'Step 1 parents: {step1_parents}')

for queue in args.download_queues:
parameters = {
'Args/dataset_projects': args.ds_dataset_projects + args.open_dataset_projects,
'Args/dataset_dir': args.dataset_download_dir,
}
step_name = f'download-{queue}'
print(f'Creating step {step_name}')
pipeline.add_step(
name=step_name,
parents=step1_parents,
execution_queue=queue,
base_task_project=args.pipeline_templates_project,
base_task_name='download-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
cache_executed_step=True,
)

pipeline.start_locally()
pipeline.wait()
print('Pipeline finished') `

  
  
Posted one year ago

UpsetBlackbird87
pipeline.start()Will launch the pipeline itself On a remote machine (a machine running the services agent).
This is why your pipeline is "stuck" it is not actually running.
When you call start_lcoally() the pipeline logic itself is runnign on your machine and the nodes are running on the workers.
Makes sense ?

  
  
Posted one year ago

UpsetBlackbird87 , thanks! I'll play with it a bit to see if it reproduces on my side as well 🙂

  
  
Posted one year ago
586 Views
13 Answers
one year ago
one year ago
Tags
Similar posts