AgitatedDove14 The pipelines are executed by the agents that are listening to the queue given by pipeline_execution_queue="controllers"
AgitatedDove14 After checking, I discovered that apparently it doesn't matter if each pipeline is executed by a different worker, the error persists. Honestly this has me puzzled. I'm really looking forward to getting this functionality right because it's an aspect that would make ClearML shine even more.
Great AgitatedDove14 , I tested it on the mock example and it worked just as I expected 🎉
Quick update, I found the issue, working on a fix 🙂
AgitatedDove14 I have the strong feeling it must be an agent issue, because when I place PipelineDecorator.run_locally()
before calling the pipeline, everything works perfectly. See:
AgitatedDove14 By adding PipelineDecorator.run_locally()
everything seems to work perfectly. This is what I expect the experiment listing to look like when the agents are the ones running the code. With this, I'm pretty sure the error search can be narrowed down to the agents' code.
GiganticTurtle0 this is exactly what I did, and ended up with two pipelines, comparing them produced what I expected (different arguments as passed by the script).
What are you getting ?
What exactly do you mean by that? From VS Code I execute the following script, and then the agents take care of executing the code remotely:
` import pandas as pd
from clearml import Task, TaskTypes
from clearml.automation.controller import PipelineDecorator
CACHE = False
@PipelineDecorator.component(
name="Wind data creator",
return_values=["wind_series"],
cache=CACHE,
execution_queue="data_cpu",
task_type=TaskTypes.data_processing,
)
def generate_wind(start_date: str, end_date: str) -> pd.Series:
import numpy as np
import pandas as pd
samples_dates = pd.date_range(start=start_date, end=end_date, freq="10T")
rng = np.random.default_rng()
wind_data = rng.weibull(2, len(samples_dates))
return pd.Series(
data=wind_data,
index=samples_dates,
name="wind_series-(m/s)",
)
@PipelineDecorator.component(
name="Wind data parser",
return_values=["wind_series_parsed"],
cache=CACHE,
execution_queue="data_cpu",
task_type=TaskTypes.data_processing,
packages=["pandas"],
)
def parse_wind(wind_series: pd.Series) -> pd.Series:
# Cleaning process
wind_series = wind_series.dropna()
# Resampling process
wind_series = wind_series.resample("1H", label="right").mean()
return wind_series
@PipelineDecorator.component(
name="Dummy forecaster",
return_values=["forecast_series"],
cache=CACHE,
execution_queue="inference",
task_type=TaskTypes.inference,
)
def forecast_with_persistence(
wind_series: pd.Series, base_time: str, horizons: int
) -> pd.Series:
import numpy as np
import pandas as pd
base_time = pd.to_datetime(base_time)
if base_time not in wind_series.index:
# Find the closest previous date of the wind series to 'base_time'
nearest_floor_index = wind_series.index.get_loc(
base_time, method="ffill", tolerance=None
)
base_time = wind_series.index[nearest_floor_index]
print("Persistence forecast has been made from", base_time)
return pd.Series(
data=np.repeat(wind_series.loc[base_time], horizons),
index=pd.date_range(start=base_time, periods=int(horizons), freq="10T"),
name="forecast",
)
@PipelineDecorator.pipeline(
name="Prediction Service (deployed)",
project="Mocks",
version="1.0.0",
pipeline_execution_queue="controllers",
multi_instance_support=True,
add_pipeline_tags=True,
)
def prediction_service(config: dict):
logger = Task.current_task().logger
logger.report_text(f"Running step {generate_wind.__name__!r}")
raw_series = generate_wind(start_date=config["start"], end_date=config["end"])
logger.report_text(f"Running step {parse_wind.__name__!r}")
parsed_series = parse_wind(raw_series)
logger.report_text(f"Running step {forecast_with_persistence.__name__!r}")
forecast_values = forecast_with_persistence(
parsed_series,
base_time=config["forecast_base_time"],
horizons=config["horizons"],
)
logger.report_text("The predictions are already cooked!")
print(forecast_values)
if name == "main":
# PipelineDecorator.run_locally()
default_config = {"start": "", "end": "", "forecast_base_time": "", "horizons": 72}
date_configs = [
("2021-02-01 00:00", "2021-02-25 23:00", "2021-02-17 17:16"),
("2021-11-01 00:00", "2021-11-30 23:00", "2021-11-09 19:09"),
]
for start_date, end_date, base_date in date_configs:
default_config["start"] = start_date
default_config["end"] = end_date
default_config["forecast_base_time"] = base_date
# Run wind prediction service
prediction_service(config=default_config) `
Well, this is just a mock example 🙂 . In the real application I'm working on there will be more than one configuration file (in principle one for the data and one for the DL model). Regarding the fix, I am not in a hurry at the moment. I'll happily wait for tomorrow (or the day after) when the commit is pushed!
Hey GiganticTurtle0 ,
So basically the issue is the the pipeline function ( prediction_service
) is getting a dict as input, and it is expecting to get basic types... if you were to do the following, it would have worked as expected.prediction_service(**default_config)
I will make sure we flatten any dictionary so that we end up with config/start
, instead of a serialized version of the dict.
wdyt?
AgitatedDove14 Exactly, I've run into the same problem
AgitatedDove14 I ended up with two pipelines being executed until they completed the workflow but duplicating each of their steps. You can check it here:
https://clearml.slack.com/files/U02A5DGPMPU/F02SR3G9RDK/image.png
GiganticTurtle0 I think I located the issue:
it seems the change is in "config" (and for some reason it stores the entire dict) but the split values are not changed.
Is this it?
Hi AgitatedDove14 , it's nice to know you've already pinpointed the problem! I think the solution you propose is a good one, but does that mean I have to unpack all the dictionary values as parameters of the pipeline function? Wouldn't that make the function too "dirty"? Or do you mean you will soon push a commit that will allow me to keep passing a dictionary and ClearML automatically flatten it?
AgitatedDove14 So did you get the same results without step duplication?
Hey AgitatedDove14 ! Any news on this? 🙂
Right! I just noticed that! this is odd... and yes defiantly has something to do with the multi pipeline executed on the agent, I think I know what to look for ...
(just making sure (again), running_locally produced exactly what we were expecting, is that correct?)
Yes, although I use both terms interchangeably. The information will actually be contained in JSON files.
AgitatedDove14 BTW, I got the notification from GitHub telling me you had committed the fix and I went ahead. After testing the code again, I see the task parameter dictionary has been removed properly (now it has been broken down into flat parameters). However, I still have the same problem with duplicate tasks, as you can see in the image.
After testing the code again, I see the task parameter dictionary has been removed properly
Great!
However, I still have the same problem with duplicate tasks, as you can see in the image.
Any chance the pipeline script Itself is running from the agent (as opposed to running the pipeline code locally, then the pipelines are executed on the agent)?
But maybe another solution would be to pass the configuration files paths as function arguments, then read and parse them inside the pipeline
when you say "configuration files" are you referencing the dict in the mock example ?
GiganticTurtle0 fix was pushed 🙂
you can test with:pip install git+
🤞
That's right! run_locally() does just what I was expecting
okay, let me see if I can nail down the issue
but does that mean I have to unpack all the dictionary values as parameters of the pipeline function?
I was just suggesting a hack 🙂 the fix itself is transparent (I'm expecting it to be pushed tomorrow), basically it will make sure the sample pipeline will work as expected.
regardless and out of curiosity, if you only have one dict passed to the pipeline function, why not use named arguments ?