Where was it running?
this message appears in the pipeline task's log. It is preceded by lines that reflect the storage manager downloading a corresponding zip file
I take it that these files are also brought into pipeline tasks's local disk?
Unless you changed the object, then no, they should not be downloaded (the "link" is passed)
The object is run_model_path
I don't seem to be changing it. I just pass it along from the training component to the evaluation component.
I can try to reproduce this with cleaner code but this is my pipeline definition:
` @PipelineDecorator.pipeline(
name="fastai_image_classification_pipeline",
project="lavi-testing",
version="0.2",
multi_instance_support=True,
)
def fastai_image_classification_pipeline(
run_id: str,
i_datasets: Tuple[int],
backbone_names: List[str],
image_resizes: List[int],
batch_sizes: List[int],
num_train_epochs: int,
):
from clearml import Task
from concurrent.futures import ThreadPoolExecutor
class TaskURIs:
def __init__(self, project, pipeline_name, run_id):
path_pref = f"{project}/{pipeline_name}"
self.tboard = f"{path_pref}/tboard/{run_id}"
self.models = f"{path_pref}/models/{run_id}"
self.evaluations = f"{path_pref}/evaluations/{run_id}"
def train_and_eval(
backbone_name,
image_resize,
batch_size,
num_train_epochs,
training_dataset,
run_uris,
sub_run_id,
):
print("train model")
run_model_path, run_tb_path = train_image_classifier_component(
clearml_dataset=training_dataset,
backbone_name=backbone_name,
image_resize=image_resize,
batch_size=batch_size,
run_model_uri=run_uris.models,
run_tb_uri=run_uris.tboard,
local_data_path="/data",
num_epochs=num_train_epochs,
)
print("evaluate model")
run_eval_path = eval_model_component(
run_learner_path=run_model_path,
run_id=sub_run_id,
dataset_name="pets_evaluation",
dataset_project="lavi-testing",
run_eval_uri=run_uris.evaluations,
image_resize=image_resize,
batch_size=int(batch_size * 1.5),
local_data_path="/data",
)
return run_eval_path
project_name = "lavi-testing"
pipeline_name = "fastai_image_classification"
pipeline_task = Task.current_task()
print("pipeline task=", pipeline_task)
for i_dataset in i_datasets:
sub_run_id = run_id + f"_{i_dataset}"
print("sub_run_id:", sub_run_id)
run_uris = TaskURIs(
project=project_name, pipeline_name=pipeline_name, run_id=sub_run_id
)
print("make dataset")
training_dataset = make_new_dataset_component(
project=project_name, i_dataset=i_dataset, num_samples_per_chunk=500
)
with ThreadPoolExecutor(max_workers=10, ) as executor:
futures = executor.map(
train_and_eval,
backbone_names,
image_resizes,
batch_sizes,
[num_train_epochs] * len(batch_sizes),
[training_dataset] * len(batch_sizes),
[run_uris] * len(batch_sizes),
[sub_run_id] * len(batch_sizes),
)
for future in futures:
print(future.result())
print("pipeline complete") `