Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Unanswered
Hi, Is There A Simple Way To Make


For sure! Excluding some parts related to preprocessing, this is the code I would like to parallelize with dask.distributed.Client .

` from typing import Any, Dict, List, Tuple, Union
from pathlib import Path

import xarray as xr
from clearml import Task
from dask.distributed import Client, LocalCluster

def start_dask_client(
n_workers: int = None, threads_per_worker: int = None, memory_limit: str = "2Gb"
) -> Client:
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
)
client = Client(cluster)
print("Client info:", client)
print("Scheduler info:", cluster.scheduler)
print("Dashboard link:", cluster.dashboard_link, end="\n\n")
for wid, worker in cluster.workers.items():
print(f"{wid}: {worker}")
return client

class NetCDFDatasetReader:
def init(self, use_dask_client: bool = False, preprocesser: Any = None) -> None:
self.dask_client = start_dask_client() if use_dask_client else None
self.preprocesser = preprocesser

def read_dataset(self, filepaths: Union[str, Path, List]) -> xr.Dataset:
    def read_and_process_file(filepath):
        with xr.open_dataset(filepath) as ds:
            if self.preprocesser is not None:
                ds = self.preprocesser(ds)
            print(f"{filepath} successfully loaded.")
            return ds

    filepaths = [filepaths] if isinstance(filepaths, (str, Path)) else filepaths
    if self.dask_client is not None:
        futures = self.dask_client.map(read_and_process_file, filepaths)
        loaded_dataset = [future.result() for future in futures]
    else:
        loaded_dataset = [read_and_process_file(filepath) for filepath in filepaths]
    combined_ds = xr.concat(
        loaded_dataset,
        dim="time",
        data_vars="minimal",
        coords="minimal",
        compat="override",
    )

    return combined_ds

if name == "main":

# Connecting ClearML with the current process.
task = Task.init(
    project_name="toy_examples_2021",
    task_name="parallel_netcdfs_reading_with_dask",
    task_type=Task.TaskTypes.custom,
)

# Specify the root path where netCDF files are stored.
NETCDFS_ROOT = "/home/user/clearML_datasets/source_files/"

# Create paths for all files found in the root directory.
netcdfs_paths = list(Path(NETCDFS_ROOT).glob("**/*.nc"))

# Configure the dataset reader tool.
netcdfs_reader = NetCDFDatasetReader(use_dask_client=True)

# Start files reading process and get an uniform dataset.
in_memory_dataset = netcdfs_reader.read_dataset(netcdfs_paths)

print(in_memory_dataset) `

Again, I'm getting the same error using this snippet:

Traceback (most recent call last): File "/home/user/clearML/toy_example_read_netcdf_dataset.py", line 76, in <module> in_memory_dataset = netcdfs_reader.read_dataset(netcdfs_paths) File "/home/user/clearML/toy_example_read_netcdf_dataset.py", line 41, in read_dataset futures = self.dask_client.map(read_and_subset_file, filepaths) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/client.py", line 1819, in map futures = self._graph_to_futures( File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/client.py", line 2611, in _graph_to_futures dsk = dsk.__dask_distributed_pack__(self, keyset, annotations) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/dask/highlevelgraph.py", line 1046, in __dask_distributed_pack__ "state": layer.__dask_distributed_pack__( File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/dask/highlevelgraph.py", line 425, in __dask_distributed_pack__ dsk = toolz.valmap(dumps_task, dsk) File "cytoolz/dicttoolz.pyx", line 181, in cytoolz.dicttoolz.valmap File "cytoolz/dicttoolz.pyx", line 206, in cytoolz.dicttoolz.valmap File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/worker.py", line 3811, in dumps_task return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])} File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/worker.py", line 3775, in dumps_function result = pickle.dumps(func, protocol=4) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps result = cloudpickle.dumps(x, **dump_kwargs) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/home/user/anaconda3/envs/toy_examples_env/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) TypeError: cannot pickle '_asyncio.Task' object
Btw, apparently if you do not call the Dask client within the "if name ...", you end up in an infinite recursive process where the module tries to be loaded over and over again.

  
  
Posted 3 years ago
166 Views
0 Answers
3 years ago
one year ago