AgitatedDove14 After checking, I discovered that apparently it doesn't matter if each pipeline is executed by a different worker, the error persists. Honestly this has me puzzled. I'm really looking forward to getting this functionality right because it's an aspect that would make ClearML shine even more.
AgitatedDove14 By adding PipelineDecorator.run_locally()
everything seems to work perfectly. This is what I expect the experiment listing to look like when the agents are the ones running the code. With this, I'm pretty sure the error search can be narrowed down to the agents' code.
GiganticTurtle0 I think I located the issue:
it seems the change is in "config" (and for some reason it stores the entire dict) but the split values are not changed.
Is this it?
AgitatedDove14 Exactly, I've run into the same problem
Hey GiganticTurtle0 ,
So basically the issue is the the pipeline function ( prediction_service
) is getting a dict as input, and it is expecting to get basic types... if you were to do the following, it would have worked as expected.prediction_service(**default_config)
I will make sure we flatten any dictionary so that we end up with config/start
, instead of a serialized version of the dict.
wdyt?
Hi AgitatedDove14 , it's nice to know you've already pinpointed the problem! I think the solution you propose is a good one, but does that mean I have to unpack all the dictionary values as parameters of the pipeline function? Wouldn't that make the function too "dirty"? Or do you mean you will soon push a commit that will allow me to keep passing a dictionary and ClearML automatically flatten it?
but does that mean I have to unpack all the dictionary values as parameters of the pipeline function?
I was just suggesting a hack 🙂 the fix itself is transparent (I'm expecting it to be pushed tomorrow), basically it will make sure the sample pipeline will work as expected.
regardless and out of curiosity, if you only have one dict passed to the pipeline function, why not use named arguments ?
Well, this is just a mock example 🙂 . In the real application I'm working on there will be more than one configuration file (in principle one for the data and one for the DL model). Regarding the fix, I am not in a hurry at the moment. I'll happily wait for tomorrow (or the day after) when the commit is pushed!
But maybe another solution would be to pass the configuration files paths as function arguments, then read and parse them inside the pipeline
when you say "configuration files" are you referencing the dict in the mock example ?
Yes, although I use both terms interchangeably. The information will actually be contained in JSON files.
AgitatedDove14 BTW, I got the notification from GitHub telling me you had committed the fix and I went ahead. After testing the code again, I see the task parameter dictionary has been removed properly (now it has been broken down into flat parameters). However, I still have the same problem with duplicate tasks, as you can see in the image.
AgitatedDove14 I have the strong feeling it must be an agent issue, because when I place PipelineDecorator.run_locally()
before calling the pipeline, everything works perfectly. See:
After testing the code again, I see the task parameter dictionary has been removed properly
Great!
However, I still have the same problem with duplicate tasks, as you can see in the image.
Any chance the pipeline script Itself is running from the agent (as opposed to running the pipeline code locally, then the pipelines are executed on the agent)?
What exactly do you mean by that? From VS Code I execute the following script, and then the agents take care of executing the code remotely:
` import pandas as pd
from clearml import Task, TaskTypes
from clearml.automation.controller import PipelineDecorator
CACHE = False
@PipelineDecorator.component(
name="Wind data creator",
return_values=["wind_series"],
cache=CACHE,
execution_queue="data_cpu",
task_type=TaskTypes.data_processing,
)
def generate_wind(start_date: str, end_date: str) -> pd.Series:
import numpy as np
import pandas as pd
samples_dates = pd.date_range(start=start_date, end=end_date, freq="10T")
rng = np.random.default_rng()
wind_data = rng.weibull(2, len(samples_dates))
return pd.Series(
data=wind_data,
index=samples_dates,
name="wind_series-(m/s)",
)
@PipelineDecorator.component(
name="Wind data parser",
return_values=["wind_series_parsed"],
cache=CACHE,
execution_queue="data_cpu",
task_type=TaskTypes.data_processing,
packages=["pandas"],
)
def parse_wind(wind_series: pd.Series) -> pd.Series:
# Cleaning process
wind_series = wind_series.dropna()
# Resampling process
wind_series = wind_series.resample("1H", label="right").mean()
return wind_series
@PipelineDecorator.component(
name="Dummy forecaster",
return_values=["forecast_series"],
cache=CACHE,
execution_queue="inference",
task_type=TaskTypes.inference,
)
def forecast_with_persistence(
wind_series: pd.Series, base_time: str, horizons: int
) -> pd.Series:
import numpy as np
import pandas as pd
base_time = pd.to_datetime(base_time)
if base_time not in wind_series.index:
# Find the closest previous date of the wind series to 'base_time'
nearest_floor_index = wind_series.index.get_loc(
base_time, method="ffill", tolerance=None
)
base_time = wind_series.index[nearest_floor_index]
print("Persistence forecast has been made from", base_time)
return pd.Series(
data=np.repeat(wind_series.loc[base_time], horizons),
index=pd.date_range(start=base_time, periods=int(horizons), freq="10T"),
name="forecast",
)
@PipelineDecorator.pipeline(
name="Prediction Service (deployed)",
project="Mocks",
version="1.0.0",
pipeline_execution_queue="controllers",
multi_instance_support=True,
add_pipeline_tags=True,
)
def prediction_service(config: dict):
logger = Task.current_task().logger
logger.report_text(f"Running step {generate_wind.__name__!r}")
raw_series = generate_wind(start_date=config["start"], end_date=config["end"])
logger.report_text(f"Running step {parse_wind.__name__!r}")
parsed_series = parse_wind(raw_series)
logger.report_text(f"Running step {forecast_with_persistence.__name__!r}")
forecast_values = forecast_with_persistence(
parsed_series,
base_time=config["forecast_base_time"],
horizons=config["horizons"],
)
logger.report_text("The predictions are already cooked!")
print(forecast_values)
if name == "main":
# PipelineDecorator.run_locally()
default_config = {"start": "", "end": "", "forecast_base_time": "", "horizons": 72}
date_configs = [
("2021-02-01 00:00", "2021-02-25 23:00", "2021-02-17 17:16"),
("2021-11-01 00:00", "2021-11-30 23:00", "2021-11-09 19:09"),
]
for start_date, end_date, base_date in date_configs:
default_config["start"] = start_date
default_config["end"] = end_date
default_config["forecast_base_time"] = base_date
# Run wind prediction service
prediction_service(config=default_config) `
AgitatedDove14 The pipelines are executed by the agents that are listening to the queue given by pipeline_execution_queue="controllers"
GiganticTurtle0 this is exactly what I did, and ended up with two pipelines, comparing them produced what I expected (different arguments as passed by the script).
What are you getting ?
AgitatedDove14 I ended up with two pipelines being executed until they completed the workflow but duplicating each of their steps. You can check it here:
https://clearml.slack.com/files/U02A5DGPMPU/F02SR3G9RDK/image.png
AgitatedDove14 So did you get the same results without step duplication?
Right! I just noticed that! this is odd... and yes defiantly has something to do with the multi pipeline executed on the agent, I think I know what to look for ...
(just making sure (again), running_locally produced exactly what we were expecting, is that correct?)
That's right! run_locally() does just what I was expecting
okay, let me see if I can nail down the issue
Quick update, I found the issue, working on a fix 🙂
Hey AgitatedDove14 ! Any news on this? 🙂
GiganticTurtle0 fix was pushed 🙂
you can test with:pip install git+
🤞
Great AgitatedDove14 , I tested it on the mock example and it worked just as I expected 🎉