This example seems to suffice
Perhaps I should mention that I use gs as my files service ( files_server:
gs://clearml-evaluation/ )
` from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes
@PipelineDecorator.component(
return_values=["large_file_path"], cache=False, task_type=TaskTypes.data_processing
)
def step_write(i: int):
import os
large_file_path = f"/tmp/out_path_{i}"
os.makedirs(large_file_path)
with open(f"{large_file_path}/large_file.txt", "wb") as fp:
fp.write(os.urandom(1000000000))
return large_file_path
@PipelineDecorator.component(
return_values=["first_byte"], cache=False, task_type=TaskTypes.data_processing
)
def step_read(file_path):
with open(f"{file_path}/large_file.txt", "rb") as fp:
v = fp.read()
first_byte = v[0]
return first_byte
@PipelineDecorator.pipeline(
name="paralel_pipeline", project="paralel_test", version="0.0.5"
)
def parallel_pipeline():
def inner(i):
ret_path = step_write(i)
v = step_read(ret_path)
return v
from concurrent.futures import ThreadPoolExecutor
idxs = list(range(20))
with ThreadPoolExecutor(max_workers=10) as executor:
futures = executor.map(
inner,
idxs
)
for future in futures:
print(future.result())
if name == "main":
PipelineDecorator.set_default_execution_queue("default")
#PipelineDecorator.run_locally()
parallel_pipeline()
print("process completed") `The pipeline log shows multiple downloads (still running)