Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Autoscaler Parallelization Issue: I Have An Aws Autoscaler Set Up With A Resource That Has A Max Of 3 Instances Assigned To The

Autoscaler parallelization issue:

I have an AWS Autoscaler set up with a resource that has a max of 3 instances assigned to the default queue

I've given it a pipeline that, simplified, is something like this

for dataset_conf in dataset_configs: dataset = make_dataset_component(dataset_conf) for training_conf in training_configs: model_path = train_image_classifier_component(training_conf) eval_result_path = eval_model_component(model_path)dependency tree is very parallelizable (see image of completed pipeline) below.

Following the runs as they take place in app.clear.ml, I see that only 1-2 components are being run / enqueued concurrently and the rest do not get enqueued. (see other image)

Why don't I see more parallelization?

  
  
Posted 2 years ago
Votes Newest

Answers 25


Hi PanickyMoth78
So the current implantation of the pipeline parallelization is exactly like python async function calls:
for dataset_conf in dataset_configs: dataset = make_dataset_component(dataset_conf) for training_conf in training_configs: model_path = train_image_classifier_component(training_conf) eval_result_path = eval_model_component(model_path)Specifically here since you are passing the output of one function to another, image what happens is a wait operation, hence it waits for the results (i.e. not parallel)
But if you do you get exactly what you are looking for (basically matching debugging with remote execution)
`
def inner(a_dataset_conf):
dataset = make_dataset_component(a_dataset_conf)
for training_conf in training_configs:
model_path = train_image_classifier_component(training_conf)
eval_result_path = eval_model_component(model_path)

with ThreadPoolExecutor() as executor:
future = executor.map(inner, dataset_configs)
print(future.result()) `

  
  
Posted 2 years ago

Come to think about it, maybe we should have "parallel_for" as a utility for the pipeline since this is so useful

  
  
Posted 2 years ago

thanks for explaining it. Makes sense 👍 I'll give it a try

  
  
Posted 2 years ago

👍

  
  
Posted 2 years ago

erm,
this parallelization has led to the pipeline task issuing a bunch of:
model_path/run_2022_07_20T22_11_15.209_0.zip , err: [Errno 28] No space left on deviceand quitting on me.
my train_image_classifier_component is programmed to save model files to a local path which is returned (and, thanks to clearml, the path's contents are zipped uploded to the files service).

I take it that these files are also brought into pipeline tasks's local disk?
Why is that? If that is indeed what's happening, it's creating a bottleneck.

I'd have expected the tasks that use these files (the evaluation components) to pull the data directly from the files service..

  
  
Posted 2 years ago

model_path/run_2022_07_20T22_11_15.209_0.zip , err: [Errno 28] No space left on deviceWhere was it running?

I take it that these files are also brought into pipeline tasks's local disk?

Unless you changed the object, then no, they should not be downloaded (the "link" is passed)

  
  
Posted 2 years ago

Where was it running?

this message appears in the pipeline task's log. It is preceded by lines that reflect the storage manager downloading a corresponding zip file

I take it that these files are also brought into pipeline tasks's local disk?

Unless you changed the object, then no, they should not be downloaded (the "link" is passed)

The object is run_model_path
I don't seem to be changing it. I just pass it along from the training component to the evaluation component.

I can try to reproduce this with cleaner code but this is my pipeline definition:
` @PipelineDecorator.pipeline(
name="fastai_image_classification_pipeline",
project="lavi-testing",
version="0.2",
multi_instance_support=True,
)
def fastai_image_classification_pipeline(
run_id: str,
i_datasets: Tuple[int],
backbone_names: List[str],
image_resizes: List[int],
batch_sizes: List[int],
num_train_epochs: int,
):
from clearml import Task
from concurrent.futures import ThreadPoolExecutor

class TaskURIs:
    def __init__(self, project, pipeline_name, run_id):
        path_pref = f"{project}/{pipeline_name}"
        self.tboard = f"{path_pref}/tboard/{run_id}"
        self.models = f"{path_pref}/models/{run_id}"
        self.evaluations = f"{path_pref}/evaluations/{run_id}"

def train_and_eval(
    backbone_name,
    image_resize,
    batch_size,
    num_train_epochs,
    training_dataset,
    run_uris,
    sub_run_id,
):
    print("train model")
    run_model_path, run_tb_path = train_image_classifier_component(
        clearml_dataset=training_dataset,
        backbone_name=backbone_name,
        image_resize=image_resize,
        batch_size=batch_size,
        run_model_uri=run_uris.models,
        run_tb_uri=run_uris.tboard,
        local_data_path="/data",
        num_epochs=num_train_epochs,
    )
    print("evaluate model")
    run_eval_path = eval_model_component(
        run_learner_path=run_model_path,
        run_id=sub_run_id,
        dataset_name="pets_evaluation",
        dataset_project="lavi-testing",
        run_eval_uri=run_uris.evaluations,
        image_resize=image_resize,
        batch_size=int(batch_size * 1.5),
        local_data_path="/data",
    )
    return run_eval_path


project_name = "lavi-testing"
pipeline_name = "fastai_image_classification"

pipeline_task = Task.current_task()
print("pipeline task=", pipeline_task)
for i_dataset in i_datasets:
    sub_run_id = run_id + f"_{i_dataset}"
    print("sub_run_id:", sub_run_id)

    run_uris = TaskURIs(
        project=project_name, pipeline_name=pipeline_name, run_id=sub_run_id
    )

    print("make dataset")
    training_dataset = make_new_dataset_component(
        project=project_name, i_dataset=i_dataset, num_samples_per_chunk=500
    )

    with ThreadPoolExecutor(max_workers=10, ) as executor:
        futures = executor.map(
            train_and_eval,
            backbone_names,
            image_resizes,
            batch_sizes,
            [num_train_epochs] * len(batch_sizes),
            [training_dataset] * len(batch_sizes),
            [run_uris] * len(batch_sizes),
            [sub_run_id] * len(batch_sizes),
        )
        for future in futures:
            print(future.result())

print("pipeline complete") `
  
  
Posted 2 years ago

Note that the same models files were previously also generated by a non-paralelized version of the same pipeline without the out-of-space error but a storage manager was downloading zip files in that version as well (maybe these files were downloaded and removed as the object reference counts went to 0?)

  
  
Posted 2 years ago

I assume now it downloads "more" data as this is running in parallel (and yes I assume that before it deleted the files it did not need)
But actually, at east from a first glance, I do not think it should download it at all...
Could it be that the "run_model_path" is a "complex" object of a sort, and it needs to test the values inside ?

  
  
Posted 2 years ago

How did you define the decorator of "train_image_classifier_component" ?
Did you define:
@PipelineDecorator.component(return_values=['run_model_path', 'run_tb_path'], ...Notice two return values

  
  
Posted 2 years ago

Two values:
`
@PipelineDecorator.component(
return_values=["run_model_path", "run_tb_path"],
cache=False,
task_type=TaskTypes.training,
packages=[
"clearml",
"tensorboard_logger",
"timm",
"fastai",
"torch==1.11.0",
"torchvision==0.12.0",
"protobuf==3.19.*",
"tensorboard",
"google-cloud-storage>=1.13.2",
],
repo="git@github.com:shpigi/clearml_evaluation.git",
repo_branch="main",
)
def train_image_classifier_component(
clearml_dataset,
backbone_name,
image_resize: int,
batch_size: int,
run_model_uri,
run_tb_uri,
local_data_path,
num_epochs: int,
):
import sys

sys.path.insert(0, "/src/clearml_evaluation/")
from image_classifier_training import pipeline_functions

run_model_path, run_tb_path = pipeline_functions.train_image_classifier(
    clearml_dataset,
    backbone_name,
    image_resize,
    batch_size,
    run_model_uri,
    run_tb_uri,
    local_data_path,
    num_epochs,
)    
return run_model_path, run_tb_path `
  
  
Posted 2 years ago

This looks good to me...
I will have to look into it, because it should not download it...

  
  
Posted 2 years ago

These paths are pathlib.Path . Would that be a problem?

  
  
Posted 2 years ago

I'll try and reproduce this in simpler code

  
  
Posted 2 years ago

These paths are

pathlib.Path

. Would that be a problem?

No need to worry, it should work (i'm assuming "/src/clearml_evaluation/" actually exists on the remote machine, otherwise useless 🙂

  
  
Posted 2 years ago

I found something btw, let me check...

  
  
Posted 2 years ago

sys.path.insert(0, "/src/clearml_evaluation/") is actually left-over code from when I was making things run locally (perhaps prior to connecting to github repo) but I think that adding a non-existent path to the system path would be benign

  
  
Posted 2 years ago

This example seems to suffice
Perhaps I should mention that I use gs as my files service ( files_server: gs://clearml-evaluation/ )
` from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes

@PipelineDecorator.component(
return_values=["large_file_path"], cache=False, task_type=TaskTypes.data_processing
)
def step_write(i: int):
import os

large_file_path = f"/tmp/out_path_{i}"
os.makedirs(large_file_path)
with open(f"{large_file_path}/large_file.txt", "wb") as fp:
    fp.write(os.urandom(1000000000))
return large_file_path

@PipelineDecorator.component(
return_values=["first_byte"], cache=False, task_type=TaskTypes.data_processing
)
def step_read(file_path):
with open(f"{file_path}/large_file.txt", "rb") as fp:
v = fp.read()
first_byte = v[0]
return first_byte

@PipelineDecorator.pipeline(
name="paralel_pipeline", project="paralel_test", version="0.0.5"
)
def parallel_pipeline():
def inner(i):
ret_path = step_write(i)
v = step_read(ret_path)
return v

from concurrent.futures import ThreadPoolExecutor
idxs = list(range(20))
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = executor.map(
        inner,
        idxs
    )
for future in futures:
    print(future.result())

if name == "main":
PipelineDecorator.set_default_execution_queue("default")
#PipelineDecorator.run_locally()
parallel_pipeline()

print("process completed") `The  pipeline log shows multiple downloads (still running)
  
  
Posted 2 years ago

(fixed small typo in code above just now)

  
  
Posted 2 years ago

I should also mention I use clearml==1.6.3rc0

  
  
Posted 2 years ago

I located the issue, I'm assuming the fix will be in the next RC 🙂
(probably tomorrow or before the weekend)

  
  
Posted 2 years ago

Thanks 🙂
I wonder if it'll also include the fix that went into in the RC I was using there ( 1.6.3rc0 )

  
  
Posted 2 years ago

PanickyMoth78 RC is out
pip install clearml==1.6.3rc1🤞

  
  
Posted 2 years ago

That's amazing speed 🚀

  
  
Posted 2 years ago

😊

  
  
Posted 2 years ago
1K Views
25 Answers
2 years ago
one year ago
Tags