Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Autoscaler Parallelization Issue: I Have An Aws Autoscaler Set Up With A Resource That Has A Max Of 3 Instances Assigned To The

Autoscaler parallelization issue:

I have an AWS Autoscaler set up with a resource that has a max of 3 instances assigned to the default queue

I've given it a pipeline that, simplified, is something like this

for dataset_conf in dataset_configs: dataset = make_dataset_component(dataset_conf) for training_conf in training_configs: model_path = train_image_classifier_component(training_conf) eval_result_path = eval_model_component(model_path)dependency tree is very parallelizable (see image of completed pipeline) below.

Following the runs as they take place in app.clear.ml, I see that only 1-2 components are being run / enqueued concurrently and the rest do not get enqueued. (see other image)

Why don't I see more parallelization?

  
  
Posted one year ago
Votes Newest

Answers 25


Where was it running?

this message appears in the pipeline task's log. It is preceded by lines that reflect the storage manager downloading a corresponding zip file

I take it that these files are also brought into pipeline tasks's local disk?

Unless you changed the object, then no, they should not be downloaded (the "link" is passed)

The object is run_model_path
I don't seem to be changing it. I just pass it along from the training component to the evaluation component.

I can try to reproduce this with cleaner code but this is my pipeline definition:
` @PipelineDecorator.pipeline(
name="fastai_image_classification_pipeline",
project="lavi-testing",
version="0.2",
multi_instance_support=True,
)
def fastai_image_classification_pipeline(
run_id: str,
i_datasets: Tuple[int],
backbone_names: List[str],
image_resizes: List[int],
batch_sizes: List[int],
num_train_epochs: int,
):
from clearml import Task
from concurrent.futures import ThreadPoolExecutor

class TaskURIs:
    def __init__(self, project, pipeline_name, run_id):
        path_pref = f"{project}/{pipeline_name}"
        self.tboard = f"{path_pref}/tboard/{run_id}"
        self.models = f"{path_pref}/models/{run_id}"
        self.evaluations = f"{path_pref}/evaluations/{run_id}"

def train_and_eval(
    backbone_name,
    image_resize,
    batch_size,
    num_train_epochs,
    training_dataset,
    run_uris,
    sub_run_id,
):
    print("train model")
    run_model_path, run_tb_path = train_image_classifier_component(
        clearml_dataset=training_dataset,
        backbone_name=backbone_name,
        image_resize=image_resize,
        batch_size=batch_size,
        run_model_uri=run_uris.models,
        run_tb_uri=run_uris.tboard,
        local_data_path="/data",
        num_epochs=num_train_epochs,
    )
    print("evaluate model")
    run_eval_path = eval_model_component(
        run_learner_path=run_model_path,
        run_id=sub_run_id,
        dataset_name="pets_evaluation",
        dataset_project="lavi-testing",
        run_eval_uri=run_uris.evaluations,
        image_resize=image_resize,
        batch_size=int(batch_size * 1.5),
        local_data_path="/data",
    )
    return run_eval_path


project_name = "lavi-testing"
pipeline_name = "fastai_image_classification"

pipeline_task = Task.current_task()
print("pipeline task=", pipeline_task)
for i_dataset in i_datasets:
    sub_run_id = run_id + f"_{i_dataset}"
    print("sub_run_id:", sub_run_id)

    run_uris = TaskURIs(
        project=project_name, pipeline_name=pipeline_name, run_id=sub_run_id
    )

    print("make dataset")
    training_dataset = make_new_dataset_component(
        project=project_name, i_dataset=i_dataset, num_samples_per_chunk=500
    )

    with ThreadPoolExecutor(max_workers=10, ) as executor:
        futures = executor.map(
            train_and_eval,
            backbone_names,
            image_resizes,
            batch_sizes,
            [num_train_epochs] * len(batch_sizes),
            [training_dataset] * len(batch_sizes),
            [run_uris] * len(batch_sizes),
            [sub_run_id] * len(batch_sizes),
        )
        for future in futures:
            print(future.result())

print("pipeline complete") `
  
  
Posted one year ago

I assume now it downloads "more" data as this is running in parallel (and yes I assume that before it deleted the files it did not need)
But actually, at east from a first glance, I do not think it should download it at all...
Could it be that the "run_model_path" is a "complex" object of a sort, and it needs to test the values inside ?

  
  
Posted one year ago

model_path/run_2022_07_20T22_11_15.209_0.zip , err: [Errno 28] No space left on deviceWhere was it running?

I take it that these files are also brought into pipeline tasks's local disk?

Unless you changed the object, then no, they should not be downloaded (the "link" is passed)

  
  
Posted one year ago

These paths are

pathlib.Path

. Would that be a problem?

No need to worry, it should work (i'm assuming "/src/clearml_evaluation/" actually exists on the remote machine, otherwise useless 🙂

  
  
Posted one year ago

I located the issue, I'm assuming the fix will be in the next RC 🙂
(probably tomorrow or before the weekend)

  
  
Posted one year ago

sys.path.insert(0, "/src/clearml_evaluation/") is actually left-over code from when I was making things run locally (perhaps prior to connecting to github repo) but I think that adding a non-existent path to the system path would be benign

  
  
Posted one year ago

These paths are pathlib.Path . Would that be a problem?

  
  
Posted one year ago

This example seems to suffice
Perhaps I should mention that I use gs as my files service ( files_server: gs://clearml-evaluation/ )
` from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes

@PipelineDecorator.component(
return_values=["large_file_path"], cache=False, task_type=TaskTypes.data_processing
)
def step_write(i: int):
import os

large_file_path = f"/tmp/out_path_{i}"
os.makedirs(large_file_path)
with open(f"{large_file_path}/large_file.txt", "wb") as fp:
    fp.write(os.urandom(1000000000))
return large_file_path

@PipelineDecorator.component(
return_values=["first_byte"], cache=False, task_type=TaskTypes.data_processing
)
def step_read(file_path):
with open(f"{file_path}/large_file.txt", "rb") as fp:
v = fp.read()
first_byte = v[0]
return first_byte

@PipelineDecorator.pipeline(
name="paralel_pipeline", project="paralel_test", version="0.0.5"
)
def parallel_pipeline():
def inner(i):
ret_path = step_write(i)
v = step_read(ret_path)
return v

from concurrent.futures import ThreadPoolExecutor
idxs = list(range(20))
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = executor.map(
        inner,
        idxs
    )
for future in futures:
    print(future.result())

if name == "main":
PipelineDecorator.set_default_execution_queue("default")
#PipelineDecorator.run_locally()
parallel_pipeline()

print("process completed") `The  pipeline log shows multiple downloads (still running)
  
  
Posted one year ago

erm,
this parallelization has led to the pipeline task issuing a bunch of:
model_path/run_2022_07_20T22_11_15.209_0.zip , err: [Errno 28] No space left on deviceand quitting on me.
my train_image_classifier_component is programmed to save model files to a local path which is returned (and, thanks to clearml, the path's contents are zipped uploded to the files service).

I take it that these files are also brought into pipeline tasks's local disk?
Why is that? If that is indeed what's happening, it's creating a bottleneck.

I'd have expected the tasks that use these files (the evaluation components) to pull the data directly from the files service..

  
  
Posted one year ago

Note that the same models files were previously also generated by a non-paralelized version of the same pipeline without the out-of-space error but a storage manager was downloading zip files in that version as well (maybe these files were downloaded and removed as the object reference counts went to 0?)

  
  
Posted one year ago

thanks for explaining it. Makes sense 👍 I'll give it a try

  
  
Posted one year ago

(fixed small typo in code above just now)

  
  
Posted one year ago

I found something btw, let me check...

  
  
Posted one year ago

Hi PanickyMoth78
So the current implantation of the pipeline parallelization is exactly like python async function calls:
for dataset_conf in dataset_configs: dataset = make_dataset_component(dataset_conf) for training_conf in training_configs: model_path = train_image_classifier_component(training_conf) eval_result_path = eval_model_component(model_path)Specifically here since you are passing the output of one function to another, image what happens is a wait operation, hence it waits for the results (i.e. not parallel)
But if you do you get exactly what you are looking for (basically matching debugging with remote execution)
`
def inner(a_dataset_conf):
dataset = make_dataset_component(a_dataset_conf)
for training_conf in training_configs:
model_path = train_image_classifier_component(training_conf)
eval_result_path = eval_model_component(model_path)

with ThreadPoolExecutor() as executor:
future = executor.map(inner, dataset_configs)
print(future.result()) `

  
  
Posted one year ago

Two values:
`
@PipelineDecorator.component(
return_values=["run_model_path", "run_tb_path"],
cache=False,
task_type=TaskTypes.training,
packages=[
"clearml",
"tensorboard_logger",
"timm",
"fastai",
"torch==1.11.0",
"torchvision==0.12.0",
"protobuf==3.19.*",
"tensorboard",
"google-cloud-storage>=1.13.2",
],
repo="git@github.com:shpigi/clearml_evaluation.git",
repo_branch="main",
)
def train_image_classifier_component(
clearml_dataset,
backbone_name,
image_resize: int,
batch_size: int,
run_model_uri,
run_tb_uri,
local_data_path,
num_epochs: int,
):
import sys

sys.path.insert(0, "/src/clearml_evaluation/")
from image_classifier_training import pipeline_functions

run_model_path, run_tb_path = pipeline_functions.train_image_classifier(
    clearml_dataset,
    backbone_name,
    image_resize,
    batch_size,
    run_model_uri,
    run_tb_uri,
    local_data_path,
    num_epochs,
)    
return run_model_path, run_tb_path `
  
  
Posted one year ago

How did you define the decorator of "train_image_classifier_component" ?
Did you define:
@PipelineDecorator.component(return_values=['run_model_path', 'run_tb_path'], ...Notice two return values

  
  
Posted one year ago

I should also mention I use clearml==1.6.3rc0

  
  
Posted one year ago

I'll try and reproduce this in simpler code

  
  
Posted one year ago

That's amazing speed 🚀

  
  
Posted one year ago

👍

  
  
Posted one year ago

😊

  
  
Posted one year ago

Come to think about it, maybe we should have "parallel_for" as a utility for the pipeline since this is so useful

  
  
Posted one year ago

This looks good to me...
I will have to look into it, because it should not download it...

  
  
Posted one year ago

Thanks 🙂
I wonder if it'll also include the fix that went into in the RC I was using there ( 1.6.3rc0 )

  
  
Posted one year ago

PanickyMoth78 RC is out
pip install clearml==1.6.3rc1🤞

  
  
Posted one year ago
598 Views
25 Answers
one year ago
one year ago
Tags