What exactly do you mean by that? From VS Code I execute the following script, and then the agents take care of executing the code remotely:
` import pandas as pd
from clearml import Task, TaskTypes
from clearml.automation.controller import PipelineDecorator
CACHE = False
@PipelineDecorator.component(
name="Wind data creator",
return_values=["wind_series"],
cache=CACHE,
execution_queue="data_cpu",
task_type=TaskTypes.data_processing,
)
def generate_wind(start_date: str, end_date: str) -> pd.Series:
import numpy as np
import pandas as pd
samples_dates = pd.date_range(start=start_date, end=end_date, freq="10T")
rng = np.random.default_rng()
wind_data = rng.weibull(2, len(samples_dates))
return pd.Series(
data=wind_data,
index=samples_dates,
name="wind_series-(m/s)",
)
@PipelineDecorator.component(
name="Wind data parser",
return_values=["wind_series_parsed"],
cache=CACHE,
execution_queue="data_cpu",
task_type=TaskTypes.data_processing,
packages=["pandas"],
)
def parse_wind(wind_series: pd.Series) -> pd.Series:
# Cleaning process
wind_series = wind_series.dropna()
# Resampling process
wind_series = wind_series.resample("1H", label="right").mean()
return wind_series
@PipelineDecorator.component(
name="Dummy forecaster",
return_values=["forecast_series"],
cache=CACHE,
execution_queue="inference",
task_type=TaskTypes.inference,
)
def forecast_with_persistence(
wind_series: pd.Series, base_time: str, horizons: int
) -> pd.Series:
import numpy as np
import pandas as pd
base_time = pd.to_datetime(base_time)
if base_time not in wind_series.index:
# Find the closest previous date of the wind series to 'base_time'
nearest_floor_index = wind_series.index.get_loc(
base_time, method="ffill", tolerance=None
)
base_time = wind_series.index[nearest_floor_index]
print("Persistence forecast has been made from", base_time)
return pd.Series(
data=np.repeat(wind_series.loc[base_time], horizons),
index=pd.date_range(start=base_time, periods=int(horizons), freq="10T"),
name="forecast",
)
@PipelineDecorator.pipeline(
name="Prediction Service (deployed)",
project="Mocks",
version="1.0.0",
pipeline_execution_queue="controllers",
multi_instance_support=True,
add_pipeline_tags=True,
)
def prediction_service(config: dict):
logger = Task.current_task().logger
logger.report_text(f"Running step {generate_wind.__name__!r}")
raw_series = generate_wind(start_date=config["start"], end_date=config["end"])
logger.report_text(f"Running step {parse_wind.__name__!r}")
parsed_series = parse_wind(raw_series)
logger.report_text(f"Running step {forecast_with_persistence.__name__!r}")
forecast_values = forecast_with_persistence(
parsed_series,
base_time=config["forecast_base_time"],
horizons=config["horizons"],
)
logger.report_text("The predictions are already cooked!")
print(forecast_values)
if name == "main":
# PipelineDecorator.run_locally()
default_config = {"start": "", "end": "", "forecast_base_time": "", "horizons": 72}
date_configs = [
("2021-02-01 00:00", "2021-02-25 23:00", "2021-02-17 17:16"),
("2021-11-01 00:00", "2021-11-30 23:00", "2021-11-09 19:09"),
]
for start_date, end_date, base_date in date_configs:
default_config["start"] = start_date
default_config["end"] = end_date
default_config["forecast_base_time"] = base_date
# Run wind prediction service
prediction_service(config=default_config) `