Hi PanickyMoth78
So the current implantation of the pipeline parallelization is exactly like python async function calls:for dataset_conf in dataset_configs: dataset = make_dataset_component(dataset_conf) for training_conf in training_configs: model_path = train_image_classifier_component(training_conf) eval_result_path = eval_model_component(model_path)
Specifically here since you are passing the output of one function to another, image what happens is a wait operation, hence it waits for the results (i.e. not parallel)
But if you do you get exactly what you are looking for (basically matching debugging with remote execution)
`
def inner(a_dataset_conf):
dataset = make_dataset_component(a_dataset_conf)
for training_conf in training_configs:
model_path = train_image_classifier_component(training_conf)
eval_result_path = eval_model_component(model_path)
with ThreadPoolExecutor() as executor:
future = executor.map(inner, dataset_configs)
print(future.result()) `
Come to think about it, maybe we should have "parallel_for" as a utility for the pipeline since this is so useful
thanks for explaining it. Makes sense 👍 I'll give it a try
erm,
this parallelization has led to the pipeline task issuing a bunch of:model_path/run_2022_07_20T22_11_15.209_0.zip , err: [Errno 28] No space left on device
and quitting on me.
my train_image_classifier_component
is programmed to save model files to a local path which is returned (and, thanks to clearml, the path's contents are zipped uploded to the files service).
I take it that these files are also brought into pipeline tasks's local disk?
Why is that? If that is indeed what's happening, it's creating a bottleneck.
I'd have expected the tasks that use these files (the evaluation components) to pull the data directly from the files service..
model_path/run_2022_07_20T22_11_15.209_0.zip , err: [Errno 28] No space left on device
Where was it running?
I take it that these files are also brought into pipeline tasks's local disk?
Unless you changed the object, then no, they should not be downloaded (the "link" is passed)
Where was it running?
this message appears in the pipeline task's log. It is preceded by lines that reflect the storage manager downloading a corresponding zip file
I take it that these files are also brought into pipeline tasks's local disk?
Unless you changed the object, then no, they should not be downloaded (the "link" is passed)
The object is run_model_path
I don't seem to be changing it. I just pass it along from the training component to the evaluation component.
I can try to reproduce this with cleaner code but this is my pipeline definition:
` @PipelineDecorator.pipeline(
name="fastai_image_classification_pipeline",
project="lavi-testing",
version="0.2",
multi_instance_support=True,
)
def fastai_image_classification_pipeline(
run_id: str,
i_datasets: Tuple[int],
backbone_names: List[str],
image_resizes: List[int],
batch_sizes: List[int],
num_train_epochs: int,
):
from clearml import Task
from concurrent.futures import ThreadPoolExecutor
class TaskURIs:
def __init__(self, project, pipeline_name, run_id):
path_pref = f"{project}/{pipeline_name}"
self.tboard = f"{path_pref}/tboard/{run_id}"
self.models = f"{path_pref}/models/{run_id}"
self.evaluations = f"{path_pref}/evaluations/{run_id}"
def train_and_eval(
backbone_name,
image_resize,
batch_size,
num_train_epochs,
training_dataset,
run_uris,
sub_run_id,
):
print("train model")
run_model_path, run_tb_path = train_image_classifier_component(
clearml_dataset=training_dataset,
backbone_name=backbone_name,
image_resize=image_resize,
batch_size=batch_size,
run_model_uri=run_uris.models,
run_tb_uri=run_uris.tboard,
local_data_path="/data",
num_epochs=num_train_epochs,
)
print("evaluate model")
run_eval_path = eval_model_component(
run_learner_path=run_model_path,
run_id=sub_run_id,
dataset_name="pets_evaluation",
dataset_project="lavi-testing",
run_eval_uri=run_uris.evaluations,
image_resize=image_resize,
batch_size=int(batch_size * 1.5),
local_data_path="/data",
)
return run_eval_path
project_name = "lavi-testing"
pipeline_name = "fastai_image_classification"
pipeline_task = Task.current_task()
print("pipeline task=", pipeline_task)
for i_dataset in i_datasets:
sub_run_id = run_id + f"_{i_dataset}"
print("sub_run_id:", sub_run_id)
run_uris = TaskURIs(
project=project_name, pipeline_name=pipeline_name, run_id=sub_run_id
)
print("make dataset")
training_dataset = make_new_dataset_component(
project=project_name, i_dataset=i_dataset, num_samples_per_chunk=500
)
with ThreadPoolExecutor(max_workers=10, ) as executor:
futures = executor.map(
train_and_eval,
backbone_names,
image_resizes,
batch_sizes,
[num_train_epochs] * len(batch_sizes),
[training_dataset] * len(batch_sizes),
[run_uris] * len(batch_sizes),
[sub_run_id] * len(batch_sizes),
)
for future in futures:
print(future.result())
print("pipeline complete") `
Note that the same models files were previously also generated by a non-paralelized version of the same pipeline without the out-of-space error but a storage manager was downloading zip files in that version as well (maybe these files were downloaded and removed as the object reference counts went to 0?)
I assume now it downloads "more" data as this is running in parallel (and yes I assume that before it deleted the files it did not need)
But actually, at east from a first glance, I do not think it should download it at all...
Could it be that the "run_model_path" is a "complex" object of a sort, and it needs to test the values inside ?
How did you define the decorator of "train_image_classifier_component" ?
Did you define:@PipelineDecorator.component(return_values=['run_model_path', 'run_tb_path'], ...
Notice two return values
Two values:
`
@PipelineDecorator.component(
return_values=["run_model_path", "run_tb_path"],
cache=False,
task_type=TaskTypes.training,
packages=[
"clearml",
"tensorboard_logger",
"timm",
"fastai",
"torch==1.11.0",
"torchvision==0.12.0",
"protobuf==3.19.*",
"tensorboard",
"google-cloud-storage>=1.13.2",
],
repo="git@github.com:shpigi/clearml_evaluation.git",
repo_branch="main",
)
def train_image_classifier_component(
clearml_dataset,
backbone_name,
image_resize: int,
batch_size: int,
run_model_uri,
run_tb_uri,
local_data_path,
num_epochs: int,
):
import sys
sys.path.insert(0, "/src/clearml_evaluation/")
from image_classifier_training import pipeline_functions
run_model_path, run_tb_path = pipeline_functions.train_image_classifier(
clearml_dataset,
backbone_name,
image_resize,
batch_size,
run_model_uri,
run_tb_uri,
local_data_path,
num_epochs,
)
return run_model_path, run_tb_path `
This looks good to me...
I will have to look into it, because it should not download it...
These paths are pathlib.Path
. Would that be a problem?
I'll try and reproduce this in simpler code
These paths are
pathlib.Path
. Would that be a problem?
No need to worry, it should work (i'm assuming "/src/clearml_evaluation/" actually exists on the remote machine, otherwise useless 🙂
sys.path.insert(0, "/src/clearml_evaluation/")
is actually left-over code from when I was making things run locally (perhaps prior to connecting to github repo) but I think that adding a non-existent path to the system path would be benign
This example seems to suffice
Perhaps I should mention that I use gs as my files service ( files_server:
gs://clearml-evaluation/ )
` from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes
@PipelineDecorator.component(
return_values=["large_file_path"], cache=False, task_type=TaskTypes.data_processing
)
def step_write(i: int):
import os
large_file_path = f"/tmp/out_path_{i}"
os.makedirs(large_file_path)
with open(f"{large_file_path}/large_file.txt", "wb") as fp:
fp.write(os.urandom(1000000000))
return large_file_path
@PipelineDecorator.component(
return_values=["first_byte"], cache=False, task_type=TaskTypes.data_processing
)
def step_read(file_path):
with open(f"{file_path}/large_file.txt", "rb") as fp:
v = fp.read()
first_byte = v[0]
return first_byte
@PipelineDecorator.pipeline(
name="paralel_pipeline", project="paralel_test", version="0.0.5"
)
def parallel_pipeline():
def inner(i):
ret_path = step_write(i)
v = step_read(ret_path)
return v
from concurrent.futures import ThreadPoolExecutor
idxs = list(range(20))
with ThreadPoolExecutor(max_workers=10) as executor:
futures = executor.map(
inner,
idxs
)
for future in futures:
print(future.result())
if name == "main":
PipelineDecorator.set_default_execution_queue("default")
#PipelineDecorator.run_locally()
parallel_pipeline()
print("process completed") `The pipeline log shows multiple downloads (still running)
I should also mention I use clearml==1.6.3rc0
I located the issue, I'm assuming the fix will be in the next RC 🙂
(probably tomorrow or before the weekend)
Thanks 🙂
I wonder if it'll also include the fix that went into in the RC I was using there ( 1.6.3rc0
)
PanickyMoth78 RC is outpip install clearml==1.6.3rc1
🤞