Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi I’M Trying Out Pipeline Controller From Tasks. I Was Not Able To Understand Why My Code Results In Just One Task(The First One) In The Pipeline.

Hi

I’m trying out pipeline controller from tasks.
I was not able to understand why my code results in just one task(the first one) in the pipeline.
` pipeline = PipelineController(
pool_frequency=0.1,
name='dataset-pipeline',
project=args.pipeline_project,
version='1',
auto_version_bump=True,
add_pipeline_tags=True,
abort_on_failure=True,
add_run_number=True,
)
pipeline.set_default_execution_queue('services')

step1_parents = []
for dataset_project in args.dataset_projects:
dataset_project_split = dataset_project.split('-')
campaign_num = dataset_project_split[2]

is_ppg = False
if len(dataset_project_split) == 4:
    is_ppg = True

parameters = {
    'Args/is_ppg': str(is_ppg),
}

step_name = f'update-{dataset_project}'
timer.print(f'Creating step {step_name} with {parameters}')
step1_parents.append(step_name)
pipeline.add_step(
    name=step_name,
    parents=None,
    execution_queue='services',
    base_task_project=args.pipeline_templates_project,
    base_task_name='update-dataset',
    parameter_override=parameters,
    task_overrides={'script.branch': args.git_branch},
    continue_on_fail=False,
)
timer.print(f'Step {step_name} created')

timer.print(f'Step 1 parents: {step1_parents}')

for queue in args.download_queues:
parameters = {
'Args/dataset_project': args.dataset_projects + args.open_dataset_projects,
}
step_name = f'download-{queue}'
timer.print(f'Creating step {step_name}')
pipeline.add_step(
name=step_name,
parents=step1_parents,
execution_queue=queue,
base_task_project=args.pipeline_templates_project,
base_task_name='download-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
)
timer.print(f'Step {step_name} created')

pipeline.start()
pipeline.wait()
timer.print('Pipeline finished') `

  
  
Posted 2 years ago
Votes Newest

Answers 13


CostlyOstrich36 The commented section is the 3 layer part which I am not currently using due to the lag
` # def update_datasets(dataset_projects, parents, is_ppg=False):

current_steps = []

for dataset_project in dataset_projects:

dataset_project_split = dataset_project.split('-')

campaign_num = dataset_project_split[2]

parameters = {

'Args/XXX': args.XXX_template.replace('*', campaign_num),

'Args/XXX': args.XXX_template.replace('*', campaign_num),

'Args/dataset_project': dataset_project,

'Args/is_ppg': str(is_ppg),

'Args/XXX': args.XXX,

'Args/XXX': args.XXX,

}

step_name = f'update-{dataset_project}'

print(f'Creating step {step_name} with {parameters}')

current_steps.append(step_name)

pipeline.add_step(

name=step_name,

parents=parents,

execution_queue='services',

base_task_project=args.pipeline_templates_project,

base_task_name='update-dataset',

parameter_override=parameters,

task_overrides={'script.branch': args.git_branch},

continue_on_fail=False,

cache_executed_step=True,

)

print(f'Step {step_name} created')

return current_steps

step1_parents = update_datasets(dataset_projects=args.ds_dataset_projects, parents=[])

print(f'Step 1 parents: {step1_parents}')

step2_parents = update_datasets(dataset_projects=args.ds_ppg_dataset_projects, parents=step1_parents, is_ppg=True)

print(f'Step 2 parents: {step2_parents}')

step1_parents = []
for dataset_project in args.ds_dataset_projects:
dataset_project_split = dataset_project.split('-')
campaign_num = dataset_project_split[2]

is_ppg = False
if len(dataset_project_split) == 4:
    is_ppg = True

parameters = {
    'Args/XXX': args.XXX_template.replace('*', campaign_num),
    'Args/XXX': args.XXX_template.replace('*', campaign_num),
    'Args/dataset_project': dataset_project,
    'Args/is_ppg': str(is_ppg),
    'Args/XXX': args.XXX,
    'Args/XXX': args.XXX,
}

step_name = f'update-{dataset_project}'
print(f'Creating step {step_name} with {parameters}')
step1_parents.append(step_name)
pipeline.add_step(
    name=step_name,
    parents=None,
    execution_queue='services',
    base_task_project=args.pipeline_templates_project,
    base_task_name='update-dataset',
    parameter_override=parameters,
    task_overrides={'script.branch': args.git_branch},
    continue_on_fail=False,
    cache_executed_step=True,
)
print(f'Step {step_name} created')

print(f'Step 1 parents: {step1_parents}')

for queue in args.download_queues:
parameters = {
'Args/dataset_projects': args.ds_dataset_projects + args.open_dataset_projects,
'Args/dataset_dir': args.dataset_download_dir,
}
step_name = f'download-{queue}'
print(f'Creating step {step_name}')
pipeline.add_step(
name=step_name,
parents=step1_parents,
execution_queue=queue,
base_task_project=args.pipeline_templates_project,
base_task_name='download-dataset',
parameter_override=parameters,
task_overrides={'script.branch': args.git_branch},
continue_on_fail=False,
cache_executed_step=True,
)

pipeline.start_locally()
pipeline.wait()
print('Pipeline finished') `

  
  
Posted 2 years ago

UpsetBlackbird87 , I couldn't reproduce the issue on my end. Can you please send me a self contained example code to try and recreate?

  
  
Posted 2 years ago

UpsetBlackbird87 , thanks! I'll play with it a bit to see if it reproduces on my side as well 🙂

  
  
Posted 2 years ago

Hi UpsetBlackbird87 ,
If you're in the pipelines UI, you can switch to the detailed view and you can see each step of the pipeline as a node 🙂
You can see an example here:
https://clear.ml/docs/latest/docs/pipelines/pipelines

  
  
Posted 2 years ago

For some reason my code results in one node even though I think the logic above should result in a bipartite graph

  
  
Posted 2 years ago

CostlyOstrich36 This resulted in a bipartite graph that I expected but why? 😕

  
  
Posted 2 years ago

Can you add the log printout of the controller?

  
  
Posted 2 years ago

Can you try running the pipeline locally using pipeline.start_locally()
https://clear.ml/docs/latest/docs/references/sdk/automation_controller_pipelinecontroller#start_locally-1
Also, try connecting a "starter" node and then make it parent of all the others at the start

  
  
Posted 2 years ago

AgitatedDove14 Thx for the clear explanation!

  
  
Posted 2 years ago

AgitatedDove14 CostlyOstrich36 [FYI] I’m not sure if this is an optimization issue in ClearML or my computer’s issue but when I make the pipeline a tri-partite graph (add another parent layer) the DAG becomes weird (I think because of the lag) and the web app lags so much until I delete this pipeline in the web app a (I can’t interact will any nodes in the graph and switching between tasks takes a long time)

  
  
Posted 2 years ago

UpsetBlackbird87
pipeline.start()Will launch the pipeline itself On a remote machine (a machine running the services agent).
This is why your pipeline is "stuck" it is not actually running.
When you call start_lcoally() the pipeline logic itself is runnign on your machine and the nodes are running on the workers.
Makes sense ?

  
  
Posted 2 years ago

Here is my log after task execution
**Deleted**

  
  
Posted 2 years ago

UpsetBlackbird87 , can you give me a code snippet with 3 layers to try and play with?

  
  
Posted 2 years ago
1K Views
13 Answers
2 years ago
one year ago
Tags
Similar posts