For sure! Excluding some parts related to preprocessing, this is the code I would like to parallelize with dask.distributed.Client
.
` from typing import Any, Dict, List, Tuple, Union
from pathlib import Path
import xarray as xr
from clearml import Task
from dask.distributed import Client, LocalCluster
def start_dask_client(
n_workers: int = None, threads_per_worker: int = None, memory_limit: str = "2Gb"
) -> Client:
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
)
client = Client(cluster)
print("Client info:", client)
print("Scheduler info:", cluster.scheduler)
print("Dashboard link:", cluster.dashboard_link, end="\n\n")
for wid, worker in cluster.workers.items():
print(f"{wid}: {worker}")
return client
class NetCDFDatasetReader:
def init(self, use_dask_client: bool = False, preprocesser: Any = None) -> None:
self.dask_client = start_dask_client() if use_dask_client else None
self.preprocesser = preprocesser
def read_dataset(self, filepaths: Union[str, Path, List]) -> xr.Dataset:
def read_and_process_file(filepath):
with xr.open_dataset(filepath) as ds:
if self.preprocesser is not None:
ds = self.preprocesser(ds)
print(f"{filepath} successfully loaded.")
return ds
filepaths = [filepaths] if isinstance(filepaths, (str, Path)) else filepaths
if self.dask_client is not None:
futures = self.dask_client.map(read_and_process_file, filepaths)
loaded_dataset = [future.result() for future in futures]
else:
loaded_dataset = [read_and_process_file(filepath) for filepath in filepaths]
combined_ds = xr.concat(
loaded_dataset,
dim="time",
data_vars="minimal",
coords="minimal",
compat="override",
)
return combined_ds
if name == "main":
# Connecting ClearML with the current process.
task = Task.init(
project_name="toy_examples_2021",
task_name="parallel_netcdfs_reading_with_dask",
task_type=Task.TaskTypes.custom,
)
# Specify the root path where netCDF files are stored.
NETCDFS_ROOT = "/home/user/clearML_datasets/source_files/"
# Create paths for all files found in the root directory.
netcdfs_paths = list(Path(NETCDFS_ROOT).glob("**/*.nc"))
# Configure the dataset reader tool.
netcdfs_reader = NetCDFDatasetReader(use_dask_client=True)
# Start files reading process and get an uniform dataset.
in_memory_dataset = netcdfs_reader.read_dataset(netcdfs_paths)
print(in_memory_dataset) `
Again, I'm getting the same error using this snippet:
Traceback (most recent call last): File "/home/user/clearML/toy_example_read_netcdf_dataset.py", line 76, in <module> in_memory_dataset = netcdfs_reader.read_dataset(netcdfs_paths) File "/home/user/clearML/toy_example_read_netcdf_dataset.py", line 41, in read_dataset futures = self.dask_client.map(read_and_subset_file, filepaths) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/client.py", line 1819, in map futures = self._graph_to_futures( File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/client.py", line 2611, in _graph_to_futures dsk = dsk.__dask_distributed_pack__(self, keyset, annotations) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/dask/highlevelgraph.py", line 1046, in __dask_distributed_pack__ "state": layer.__dask_distributed_pack__( File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/dask/highlevelgraph.py", line 425, in __dask_distributed_pack__ dsk = toolz.valmap(dumps_task, dsk) File "cytoolz/dicttoolz.pyx", line 181, in cytoolz.dicttoolz.valmap File "cytoolz/dicttoolz.pyx", line 206, in cytoolz.dicttoolz.valmap File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/worker.py", line 3811, in dumps_task return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])} File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/worker.py", line 3775, in dumps_function result = pickle.dumps(func, protocol=4) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps result = cloudpickle.dumps(x, **dump_kwargs) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) TypeError: cannot pickle '_asyncio.Task' object
Btw, apparently if you do not call the Dask client within the "if name ...", you end up in an infinite recursive process where the module tries to be loaded over and over again.