Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi, Is There A Simple Way To Make

Hi,

Is there a simple way to make Task.init compatible with Dask.distributed client?

When I try to run a script where I want to read concurrently a dataset in netCDF format split in several files, and I would like to do it through dask.distributed.Client, I get the following error:

TypeError: cannot pickle '_asyncio.Task' object

  
  
Posted 3 years ago
Votes Newest

Answers 10


For sure! Excluding some parts related to preprocessing, this is the code I would like to parallelize with dask.distributed.Client .

` from typing import Any, Dict, List, Tuple, Union
from pathlib import Path

import xarray as xr
from clearml import Task
from dask.distributed import Client, LocalCluster

def start_dask_client(
n_workers: int = None, threads_per_worker: int = None, memory_limit: str = "2Gb"
) -> Client:
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
)
client = Client(cluster)
print("Client info:", client)
print("Scheduler info:", cluster.scheduler)
print("Dashboard link:", cluster.dashboard_link, end="\n\n")
for wid, worker in cluster.workers.items():
print(f"{wid}: {worker}")
return client

class NetCDFDatasetReader:
def init(self, use_dask_client: bool = False, preprocesser: Any = None) -> None:
self.dask_client = start_dask_client() if use_dask_client else None
self.preprocesser = preprocesser

def read_dataset(self, filepaths: Union[str, Path, List]) -> xr.Dataset:
    def read_and_process_file(filepath):
        with xr.open_dataset(filepath) as ds:
            if self.preprocesser is not None:
                ds = self.preprocesser(ds)
            print(f"{filepath} successfully loaded.")
            return ds

    filepaths = [filepaths] if isinstance(filepaths, (str, Path)) else filepaths
    if self.dask_client is not None:
        futures = self.dask_client.map(read_and_process_file, filepaths)
        loaded_dataset = [future.result() for future in futures]
    else:
        loaded_dataset = [read_and_process_file(filepath) for filepath in filepaths]
    combined_ds = xr.concat(
        loaded_dataset,
        dim="time",
        data_vars="minimal",
        coords="minimal",
        compat="override",
    )

    return combined_ds

if name == "main":

# Connecting ClearML with the current process.
task = Task.init(
    project_name="toy_examples_2021",
    task_name="parallel_netcdfs_reading_with_dask",
    task_type=Task.TaskTypes.custom,
)

# Specify the root path where netCDF files are stored.
NETCDFS_ROOT = "/home/user/clearML_datasets/source_files/"

# Create paths for all files found in the root directory.
netcdfs_paths = list(Path(NETCDFS_ROOT).glob("**/*.nc"))

# Configure the dataset reader tool.
netcdfs_reader = NetCDFDatasetReader(use_dask_client=True)

# Start files reading process and get an uniform dataset.
in_memory_dataset = netcdfs_reader.read_dataset(netcdfs_paths)

print(in_memory_dataset) `

Again, I'm getting the same error using this snippet:

Traceback (most recent call last): File "/home/user/clearML/toy_example_read_netcdf_dataset.py", line 76, in <module> in_memory_dataset = netcdfs_reader.read_dataset(netcdfs_paths) File "/home/user/clearML/toy_example_read_netcdf_dataset.py", line 41, in read_dataset futures = self.dask_client.map(read_and_subset_file, filepaths) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/client.py", line 1819, in map futures = self._graph_to_futures( File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/client.py", line 2611, in _graph_to_futures dsk = dsk.__dask_distributed_pack__(self, keyset, annotations) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/dask/highlevelgraph.py", line 1046, in __dask_distributed_pack__ "state": layer.__dask_distributed_pack__( File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/dask/highlevelgraph.py", line 425, in __dask_distributed_pack__ dsk = toolz.valmap(dumps_task, dsk) File "cytoolz/dicttoolz.pyx", line 181, in cytoolz.dicttoolz.valmap File "cytoolz/dicttoolz.pyx", line 206, in cytoolz.dicttoolz.valmap File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/worker.py", line 3811, in dumps_task return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])} File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/worker.py", line 3775, in dumps_function result = pickle.dumps(func, protocol=4) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps result = cloudpickle.dumps(x, **dump_kwargs) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) TypeError: cannot pickle '_asyncio.Task' object
Btw, apparently if you do not call the Dask client within the "if name ...", you end up in an infinite recursive process where the module tries to be loaded over and over again.

  
  
Posted 3 years ago

Yep, you were absolutely right. What Dask did not like was the object self.preprocesser inside read_and_process_file , not Task.init . Since the dask.distributed.Client is initialized in that same class, maybe it's something that Dask doesn't allow.

Sorry for blaming ClearML without solid evidence x)

  
  
Posted 3 years ago

Sorry if it's something trivial. I recently started working with ClearML.

No worries, this has actually more to do with how you work with Dask
The Task ID is the unique id of the any Task in the system (task.id will return the UID str)
Can you post a toy Dash code here, I'll explain how to make it compatible with clearml 🙂

  
  
Posted 3 years ago

GiganticTurtle0
I'm assuming here that self.dask_client.map(read_and_process_file, filepaths) actually does the multi process/node processing. The way it needs to work, it has to store the current state of the process and then restore it on any remote node/process. In practice this means pickling the local variables (Task included).
First I would try to use a standalone static function for the map, DASK might be able to deduce it does not need to pickle anything, as it is standalone.
Another option is not to create the Task in the main script but on the class itself.
Third, and that is on us, maybe we could add _ getstate _/setstate to Task, so we can pickle t, where the state is just the Task ID, that wouild actually relatively easy to setup, what do you think?

  
  
Posted 3 years ago

Are you suggesting just taking the 

read_and_process_file

 function out of the 

read_dataset

 method,

Yes 🙂

As for the second option, you mean create the task in the 

init

 method of the NetCDFReader class?

correct

It would be a great idea to make the Task picklelizable,

Adding that to the next version to do list 😉

  
  
Posted 3 years ago

I see, but I don't understand the part where you talk about passing the task ID to the child processes. Sorry if it's something trivial. I recently started working with ClearML.

  
  
Posted 3 years ago

GiganticTurtle0 BTW, this mock example worked out of the box (python 3.6 on Ubuntu):
` from typing import Any, Dict, List, Tuple, Union

from clearml import Task
from dask.distributed import Client, LocalCluster

def start_dask_client(
n_workers: int = None, threads_per_worker: int = None, memory_limit: str = "2Gb"
) -> Client:
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
)
client = Client(cluster)
print("Client info:", client)
print("Scheduler info:", cluster.scheduler)
print("Dashboard link:", cluster.dashboard_link, end="\n\n")
for wid, worker in cluster.workers.items():
print(f"{wid}: {worker}")
return client

class DatasetReader:
def init(self, use_dask_client: bool = False, preprocesser: Any = None) -> None:
self.dask_client = start_dask_client() if use_dask_client else None
self.preprocesser = preprocesser

def read_dataset(self, filepaths: Union[str, List]) -> list:
    def read_and_process_file(filepath):
        print(f"{filepath} successfully loaded.")
        return 1

    futures = self.dask_client.map(read_and_process_file, filepaths)
    loaded_dataset = [future.result() for future in futures]

    return loaded_dataset

if name == "main":

# Connecting ClearML with the current process.
task = Task.init(
    project_name="examples",
    task_name="parallel with dask",
)

# Create input
paths = [f"mock_file.{i:03}.dat" for i in range(0, 200)]

# Configure the dataset reader tool.
reader = DatasetReader(use_dask_client=True)

# Start files reading process and get an uniform dataset.
in_memory_dataset = reader.read_dataset(paths)

print(in_memory_dataset) `
  
  
Posted 3 years ago

Are you suggesting just taking the read_and_process_file function out of the read_dataset method, or maybe decoupling the read_dataset method from the NetCDFReader class so it is not pickle along with the class instance itself?

As for the second option, you mean create the task in the __init__ method of the NetCDFReader class?

It would be a great idea to make the Task picklelizable, since at the moment what are the most frequently used options for integrating multiprocessing tasks in scripts that include Task.init ?

  
  
Posted 3 years ago

Hi GiganticTurtle0

Is there a simple way to make 

Task.init

 compatible with

Dask.distributed

 client?

Please tell me more 🙂
I think Dask is trying to pickle you Task object (which is not pickable).
You can however create the Task once with Task.init
and pass the Task ID to the child processes and then use Task.init(..., continue_last_task=task_id_here)
wdyt?

  
  
Posted 3 years ago

No worries 🙂
GiganticTurtle0 I'm glad it was solved 👍

  
  
Posted 3 years ago
903 Views
10 Answers
3 years ago
one year ago
Tags