Hi, can anyone help me with this code? (just a mock example, but it nicely captures the behavior of the real code)
` import pandas as pd
from clearml import Task
from clearml import PipelineDecorator
CACHE = False
MULTIPIPELINE = True
@PipelineDecorator.component(
name="Wind data creator",
return_values=["wind_series"],
cache=CACHE,
execution_queue="datasets",
task_type=Task.TaskTypes.data_processing,
)
def generate_wind(start_date: str, end_date: str) -> pd.Series:
import numpy as np
import pandas as pd
samples_dates = pd.date_range(start=start_date, end=end_date, freq="10T")
rng = np.random.default_rng()
wind_data = rng.weibull(2, len(samples_dates))
return pd.Series(
data=wind_data,
index=samples_dates,
name="wind_series-(m/s)",
)
@PipelineDecorator.component(
name="Wind data parser",
return_values=["wind_series_parsed"],
cache=CACHE,
execution_queue="datasets",
task_type=Task.TaskTypes.data_processing,
packages=["pandas"],
)
def preprocess_wind(wind_series: pd.Series) -> pd.Series:
# Cleaning process
wind_series = wind_series.dropna()
# Resampling process
wind_series = wind_series.resample("1H", label="right").mean()
return wind_series
@PipelineDecorator.component(
name="Dummy forecaster",
return_values=["forecast_series"],
cache=CACHE,
execution_queue="inferences",
task_type=Task.TaskTypes.inference,
)
def forecast_with_persistence(
wind_series: pd.Series, base_time: str, horizons: int
) -> pd.Series:
import numpy as np
import pandas as pd
base_time = pd.to_datetime(base_time)
if base_time not in wind_series.index:
# Find the closest previous date of the wind series to 'base_time'
nearest_floor_index = wind_series.index.get_loc(
base_time, method="ffill", tolerance=None
)
base_time = wind_series.index[nearest_floor_index]
print("Persistence forecast has been made from", base_time)
return pd.Series(
data=np.repeat(wind_series.loc[base_time], horizons),
index=pd.date_range(start=base_time, periods=int(horizons), freq="10T"),
name="forecast",
)
@PipelineDecorator.pipeline(
name="Prediction Service",
project="Mocks",
version="1.0.0",
pipeline_execution_queue="controllers",
multi_instance_support=MULTIPIPELINE,
add_pipeline_tags=True,
)
def prediction_service(config: dict):
logger = Task.current_task().logger
logger.report_text(f"Running step {generate_wind.__name__!r}")
raw_series = generate_wind(start_date=config["start"], end_date=config["end"])
logger.report_text(f"Running step {preprocess_wind.__name__!r}")
parsed_series = preprocess_wind(raw_series)
logger.report_text(f"Running step {forecast_with_persistence.__name__!r}")
forecast_values = forecast_with_persistence(
parsed_series,
base_time=config["forecast_base_time"],
horizons=config["horizons"],
)
logger.report_text("Predictions have been made!")
return forecast_values
if name == "main":
# Connecting ClearML with the current process.
master_task = Task.init(
project_name="Mocks",
task_name="Master Mock",
task_type=Task.TaskTypes.application,
)
master_task.execute_remotely(queue_name="masters")
default_config = {
"start": "2021-02-01 00:00",
"end": "2021-02-25 23:00",
"forecast_base_time": "2021-02-17 17:16",
"horizons": 72,
}
# Run wind prediction service
forecast = prediction_service(config=default_config)
# Attach the forecast to the master task after pipeline process is completed.
master_task.upload_artifact(name="model_forecast", artifact_object=forecast) `
What I am trying to do is to launch a pipeline from a master task, so that I can register some results from the pipeline in the master task after completion. However, I am unable to do so because of the following error:
Process Process-2: Traceback (most recent call last): File "/home/username/Miniconda3/envs/mlops/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/home/username/Miniconda3/envs/mlops/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/raid0/home/username/.clearml/venvs-builds.4/3.9/lib/python3.9/site-packages/clearml/automation/controller.py", line 3114, in sanitized_env a_result = func(*a_args, **a_kwargs) File "/raid0/home/username/.clearml/venvs-builds.4/3.9/lib/python3.9/site-packages/clearml/automation/controller.py", line 2940, in internal_decorator a_pipeline = PipelineDecorator( File "/raid0/home/username/.clearml/venvs-builds.4/3.9/lib/python3.9/site-packages/clearml/automation/controller.py", line 2258, in __init__ self.add_function_step(**n) File "/raid0/home/username/.clearml/venvs-builds.4/3.9/lib/python3.9/site-packages/clearml/automation/controller.py", line 603, in add_function_step task_definition = json.loads(self._task._get_configuration_text(name=name)) File "/home/username/Miniconda3/envs/mlops/lib/python3.9/json/__init__.py", line 339, in loads raise TypeError(f'the JSON object must be str, bytes or bytearray, ' TypeError: the JSON object must be str, bytes or bytearray, not NoneType 2022-02-22 12:50:25,777 - clearml.Task - INFO - Waiting to finish uploads 2022-02-22 13:50:34 2022-02-22 12:50:32,445 - clearml.Task - INFO - Finished uploading 2022-02-22 13:50:34 Process completed successfully
BTW, I'm working with clearml==1.1.6
and clearml-agent==1.1.2