For sure! Excluding some parts related to preprocessing, this is the code I would like to parallelize with dask.distributed.Client
.
` from typing import Any, Dict, List, Tuple, Union
from pathlib import Path
import xarray as xr
from clearml import Task
from dask.distributed import Client, LocalCluster
def start_dask_client(
n_workers: int = None, threads_per_worker: int = None, memory_limit: str = "2Gb"
) -> Client:
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
)
client = Client(cluster)
print("Client info:", client)
print("Scheduler info:", cluster.scheduler)
print("Dashboard link:", cluster.dashboard_link, end="\n\n")
for wid, worker in cluster.workers.items():
print(f"{wid}: {worker}")
return client
class NetCDFDatasetReader:
def init(self, use_dask_client: bool = False, preprocesser: Any = None) -> None:
self.dask_client = start_dask_client() if use_dask_client else None
self.preprocesser = preprocesser
def read_dataset(self, filepaths: Union[str, Path, List]) -> xr.Dataset:
def read_and_process_file(filepath):
with xr.open_dataset(filepath) as ds:
if self.preprocesser is not None:
ds = self.preprocesser(ds)
print(f"{filepath} successfully loaded.")
return ds
filepaths = [filepaths] if isinstance(filepaths, (str, Path)) else filepaths
if self.dask_client is not None:
futures = self.dask_client.map(read_and_process_file, filepaths)
loaded_dataset = [future.result() for future in futures]
else:
loaded_dataset = [read_and_process_file(filepath) for filepath in filepaths]
combined_ds = xr.concat(
loaded_dataset,
dim="time",
data_vars="minimal",
coords="minimal",
compat="override",
)
return combined_ds
if name == "main":
# Connecting ClearML with the current process.
task = Task.init(
project_name="toy_examples_2021",
task_name="parallel_netcdfs_reading_with_dask",
task_type=Task.TaskTypes.custom,
)
# Specify the root path where netCDF files are stored.
NETCDFS_ROOT = "/home/user/clearML_datasets/source_files/"
# Create paths for all files found in the root directory.
netcdfs_paths = list(Path(NETCDFS_ROOT).glob("**/*.nc"))
# Configure the dataset reader tool.
netcdfs_reader = NetCDFDatasetReader(use_dask_client=True)
# Start files reading process and get an uniform dataset.
in_memory_dataset = netcdfs_reader.read_dataset(netcdfs_paths)
print(in_memory_dataset) `
Again, I'm getting the same error using this snippet:
Traceback (most recent call last): File "/home/user/clearML/toy_example_read_netcdf_dataset.py", line 76, in <module> in_memory_dataset = netcdfs_reader.read_dataset(netcdfs_paths) File "/home/user/clearML/toy_example_read_netcdf_dataset.py", line 41, in read_dataset futures = self.dask_client.map(read_and_subset_file, filepaths) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/client.py", line 1819, in map futures = self._graph_to_futures( File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/client.py", line 2611, in _graph_to_futures dsk = dsk.__dask_distributed_pack__(self, keyset, annotations) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/dask/highlevelgraph.py", line 1046, in __dask_distributed_pack__ "state": layer.__dask_distributed_pack__( File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/dask/highlevelgraph.py", line 425, in __dask_distributed_pack__ dsk = toolz.valmap(dumps_task, dsk) File "cytoolz/dicttoolz.pyx", line 181, in cytoolz.dicttoolz.valmap File "cytoolz/dicttoolz.pyx", line 206, in cytoolz.dicttoolz.valmap File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/worker.py", line 3811, in dumps_task return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])} File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/worker.py", line 3775, in dumps_function result = pickle.dumps(func, protocol=4) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps result = cloudpickle.dumps(x, **dump_kwargs) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) TypeError: cannot pickle '_asyncio.Task' object
Btw, apparently if you do not call the Dask client within the "if name ...", you end up in an infinite recursive process where the module tries to be loaded over and over again.
Yep, you were absolutely right. What Dask did not like was the object self.preprocesser
inside read_and_process_file
, not Task.init
. Since the dask.distributed.Client
is initialized in that same class, maybe it's something that Dask doesn't allow.
Sorry for blaming ClearML
without solid evidence x)
Sorry if it's something trivial. I recently started working with ClearML.
No worries, this has actually more to do with how you work with Dask
The Task ID is the unique id of the any Task in the system (task.id will return the UID str)
Can you post a toy Dash code here, I'll explain how to make it compatible with clearml 🙂
GiganticTurtle0
I'm assuming here that self.dask_client.map(read_and_process_file, filepaths)
actually does the multi process/node processing. The way it needs to work, it has to store the current state of the process and then restore it on any remote node/process. In practice this means pickling the local variables (Task included).
First I would try to use a standalone static function for the map, DASK might be able to deduce it does not need to pickle anything, as it is standalone.
Another option is not to create the Task in the main script but on the class itself.
Third, and that is on us, maybe we could add _ getstate _/setstate to Task, so we can pickle t, where the state is just the Task ID, that wouild actually relatively easy to setup, what do you think?
Are you suggesting just taking the
read_and_process_file
function out of the
read_dataset
method,
Yes 🙂
As for the second option, you mean create the task in the
init
method of the NetCDFReader class?
correct
It would be a great idea to make the Task picklelizable,
Adding that to the next version to do list 😉
I see, but I don't understand the part where you talk about passing the task ID to the child processes. Sorry if it's something trivial. I recently started working with ClearML.
GiganticTurtle0 BTW, this mock example worked out of the box (python 3.6 on Ubuntu):
` from typing import Any, Dict, List, Tuple, Union
from clearml import Task
from dask.distributed import Client, LocalCluster
def start_dask_client(
n_workers: int = None, threads_per_worker: int = None, memory_limit: str = "2Gb"
) -> Client:
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
)
client = Client(cluster)
print("Client info:", client)
print("Scheduler info:", cluster.scheduler)
print("Dashboard link:", cluster.dashboard_link, end="\n\n")
for wid, worker in cluster.workers.items():
print(f"{wid}: {worker}")
return client
class DatasetReader:
def init(self, use_dask_client: bool = False, preprocesser: Any = None) -> None:
self.dask_client = start_dask_client() if use_dask_client else None
self.preprocesser = preprocesser
def read_dataset(self, filepaths: Union[str, List]) -> list:
def read_and_process_file(filepath):
print(f"{filepath} successfully loaded.")
return 1
futures = self.dask_client.map(read_and_process_file, filepaths)
loaded_dataset = [future.result() for future in futures]
return loaded_dataset
if name == "main":
# Connecting ClearML with the current process.
task = Task.init(
project_name="examples",
task_name="parallel with dask",
)
# Create input
paths = [f"mock_file.{i:03}.dat" for i in range(0, 200)]
# Configure the dataset reader tool.
reader = DatasetReader(use_dask_client=True)
# Start files reading process and get an uniform dataset.
in_memory_dataset = reader.read_dataset(paths)
print(in_memory_dataset) `
Are you suggesting just taking the read_and_process_file
function out of the read_dataset
method, or maybe decoupling the read_dataset
method from the NetCDFReader
class so it is not pickle along with the class instance itself?
As for the second option, you mean create the task in the __init__
method of the NetCDFReader class?
It would be a great idea to make the Task picklelizable, since at the moment what are the most frequently used options for integrating multiprocessing tasks in scripts that include Task.init
?
Hi GiganticTurtle0
Is there a simple way to make
Task.init
compatible with
Dask.distributed
client?
Please tell me more 🙂
I think Dask is trying to pickle you Task object (which is not pickable).
You can however create the Task once with Task.init
and pass the Task ID to the child processes and then use Task.init(..., continue_last_task=task_id_here)
wdyt?
No worries 🙂
GiganticTurtle0 I'm glad it was solved 👍