Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi, Can Anyone Help Me With This Code? (Just A Mock Example, But It Nicely Captures The Behavior Of The Real Code)

Hi, can anyone help me with this code? (just a mock example, but it nicely captures the behavior of the real code)

` import pandas as pd

from clearml import Task
from clearml import PipelineDecorator

CACHE = False
MULTIPIPELINE = True

@PipelineDecorator.component(
name="Wind data creator",
return_values=["wind_series"],
cache=CACHE,
execution_queue="datasets",
task_type=Task.TaskTypes.data_processing,
)
def generate_wind(start_date: str, end_date: str) -> pd.Series:

import numpy as np
import pandas as pd

samples_dates = pd.date_range(start=start_date, end=end_date, freq="10T")

rng = np.random.default_rng()
wind_data = rng.weibull(2, len(samples_dates))

return pd.Series(
    data=wind_data,
    index=samples_dates,
    name="wind_series-(m/s)",
)

@PipelineDecorator.component(
name="Wind data parser",
return_values=["wind_series_parsed"],
cache=CACHE,
execution_queue="datasets",
task_type=Task.TaskTypes.data_processing,
packages=["pandas"],
)
def preprocess_wind(wind_series: pd.Series) -> pd.Series:
# Cleaning process
wind_series = wind_series.dropna()
# Resampling process
wind_series = wind_series.resample("1H", label="right").mean()

return wind_series

@PipelineDecorator.component(
name="Dummy forecaster",
return_values=["forecast_series"],
cache=CACHE,
execution_queue="inferences",
task_type=Task.TaskTypes.inference,
)
def forecast_with_persistence(
wind_series: pd.Series, base_time: str, horizons: int
) -> pd.Series:

import numpy as np
import pandas as pd

base_time = pd.to_datetime(base_time)

if base_time not in wind_series.index:
    # Find the closest previous date of the wind series to 'base_time'
    nearest_floor_index = wind_series.index.get_loc(
        base_time, method="ffill", tolerance=None
    )
    base_time = wind_series.index[nearest_floor_index]

print("Persistence forecast has been made from", base_time)

return pd.Series(
    data=np.repeat(wind_series.loc[base_time], horizons),
    index=pd.date_range(start=base_time, periods=int(horizons), freq="10T"),
    name="forecast",
)

@PipelineDecorator.pipeline(
name="Prediction Service",
project="Mocks",
version="1.0.0",
pipeline_execution_queue="controllers",
multi_instance_support=MULTIPIPELINE,
add_pipeline_tags=True,
)
def prediction_service(config: dict):

logger = Task.current_task().logger

logger.report_text(f"Running step {generate_wind.__name__!r}")
raw_series = generate_wind(start_date=config["start"], end_date=config["end"])

logger.report_text(f"Running step {preprocess_wind.__name__!r}")
parsed_series = preprocess_wind(raw_series)

logger.report_text(f"Running step {forecast_with_persistence.__name__!r}")
forecast_values = forecast_with_persistence(
    parsed_series,
    base_time=config["forecast_base_time"],
    horizons=config["horizons"],
)

logger.report_text("Predictions have been made!")

return forecast_values

if name == "main":

# Connecting ClearML with the current process.
master_task = Task.init(
    project_name="Mocks",
    task_name="Master Mock",
    task_type=Task.TaskTypes.application,
)

master_task.execute_remotely(queue_name="masters")

default_config = {
    "start": "2021-02-01 00:00",
    "end": "2021-02-25 23:00",
    "forecast_base_time": "2021-02-17 17:16",
    "horizons": 72,
}

# Run wind prediction service
forecast = prediction_service(config=default_config)

# Attach the forecast to the master task after pipeline process is completed.
master_task.upload_artifact(name="model_forecast", artifact_object=forecast) `

What I am trying to do is to launch a pipeline from a master task, so that I can register some results from the pipeline in the master task after completion. However, I am unable to do so because of the following error:
Process Process-2: Traceback (most recent call last): File "/home/username/Miniconda3/envs/mlops/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/home/username/Miniconda3/envs/mlops/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/raid0/home/username/.clearml/venvs-builds.4/3.9/lib/python3.9/site-packages/clearml/automation/controller.py", line 3114, in sanitized_env a_result = func(*a_args, **a_kwargs) File "/raid0/home/username/.clearml/venvs-builds.4/3.9/lib/python3.9/site-packages/clearml/automation/controller.py", line 2940, in internal_decorator a_pipeline = PipelineDecorator( File "/raid0/home/username/.clearml/venvs-builds.4/3.9/lib/python3.9/site-packages/clearml/automation/controller.py", line 2258, in __init__ self.add_function_step(**n) File "/raid0/home/username/.clearml/venvs-builds.4/3.9/lib/python3.9/site-packages/clearml/automation/controller.py", line 603, in add_function_step task_definition = json.loads(self._task._get_configuration_text(name=name)) File "/home/username/Miniconda3/envs/mlops/lib/python3.9/json/__init__.py", line 339, in loads raise TypeError(f'the JSON object must be str, bytes or bytearray, ' TypeError: the JSON object must be str, bytes or bytearray, not NoneType 2022-02-22 12:50:25,777 - clearml.Task - INFO - Waiting to finish uploads 2022-02-22 13:50:34 2022-02-22 12:50:32,445 - clearml.Task - INFO - Finished uploading 2022-02-22 13:50:34 Process completed successfully
BTW, I'm working with clearml==1.1.6 and clearml-agent==1.1.2

  
  
Posted 2 years ago
Votes Newest

Answers 5


Hey CostlyOstrich36 AgitatedDove14 ! Any news on this? Should I open an issue?

  
  
Posted 2 years ago

Please let me know as soon as you have something :)

  
  
Posted 2 years ago

Looks like you're trying to do fetch something and then do something to it, that's why there's a TypeError regarding a NoneType. Do you know where in the code the traceback occurs?

  
  
Posted 2 years ago

CostlyOstrich36 Yes, it happens on the following line, at the time of calling the pipeline.
forecast = prediction_service(config=default_config)Were you able to reproduce the example?

  
  
Posted 2 years ago

I'll try soon 🙂

  
  
Posted 2 years ago
1K Views
5 Answers
2 years ago
one year ago
Tags