Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Is There Any Reason Why Doing The Following Is Not Possible? Am I Doing It Right? I Want To Run A Pipeline With Different Parameters But I Get The Following Error?

Is there any reason why doing the following is not possible? Am I doing it right? I want to run a pipeline with different parameters but I get the following error?
ValueError: Node 'First operation', base_task_id is empty 2021-10-26 12:45:04 Process failed, exit code 1Can someone reproduce this scenario? I used the code below:
` from clearml import Task
from clearml.automation.controller import PipelineDecorator

@PipelineDecorator.component(name="First operation")
def step_1(a: int):
return a ** 2 + 5

@PipelineDecorator.component(name="Second operation")
def step_2(b: int):
return 2 * b + 2

@PipelineDecorator.pipeline(
name="Math operations pipeline",
project="Mocks",
version="0.1.1",
pipeline_execution_queue="services",
)
def mypipeline(a: int, b: int):
task = Task.current_task()
print(
f"<Pipeline task: ID = {task.id}, Parent ID = {task.parent}, Status = {task.status}>"
)
result_1 = step_1(a)
result_2 = step_2(b)
total = result_1 + result_2
print("Total result:", total)

if name == "main":

# PipelineDecorator.set_default_execution_queue("default")
# PipelineDecorator.debug_pipeline()

mypipeline(1, 2)
mypipeline(2, 3) `My clearml version is 1.1.3 and the clearml-agent version is 1.1.0.
  
  
Posted 2 years ago
Votes Newest

Answers 30


After testing the code again, I see the task parameter dictionary has been removed properly

Great!

However, I still have the same problem with duplicate tasks, as you can see in the image.

Any chance the pipeline script Itself is running from the agent (as opposed to running the pipeline code locally, then the pipelines are executed on the agent)?

  
  
Posted 2 years ago

Great! Thank you 👍

  
  
Posted 2 years ago

Hey GiganticTurtle0 ,
So basically the issue is the the pipeline function ( prediction_service ) is getting a dict as input, and it is expecting to get basic types... if you were to do the following, it would have worked as expected.
prediction_service(**default_config)I will make sure we flatten any dictionary so that we end up with config/start , instead of a serialized version of the dict.
wdyt?

  
  
Posted 2 years ago

Hi AgitatedDove14 , it's nice to know you've already pinpointed the problem! I think the solution you propose is a good one, but does that mean I have to unpack all the dictionary values as parameters of the pipeline function? Wouldn't that make the function too "dirty"? Or do you mean you will soon push a commit that will allow me to keep passing a dictionary and ClearML automatically flatten it?

  
  
Posted 2 years ago

Well, this is just a mock example 🙂 . In the real application I'm working on there will be more than one configuration file (in principle one for the data and one for the DL model). Regarding the fix, I am not in a hurry at the moment. I'll happily wait for tomorrow (or the day after) when the commit is pushed!

  
  
Posted 2 years ago

But maybe another solution would be to pass the configuration files paths as function arguments, then read and parse them inside the pipeline

  
  
Posted 2 years ago

when you say "configuration files" are you referencing the dict in the mock example ?

  
  
Posted 2 years ago

AgitatedDove14 BTW, I got the notification from GitHub telling me you had committed the fix and I went ahead. After testing the code again, I see the task parameter dictionary has been removed properly (now it has been broken down into flat parameters). However, I still have the same problem with duplicate tasks, as you can see in the image.

  
  
Posted 2 years ago

Yes, although I use both terms interchangeably. The information will actually be contained in JSON files.

  
  
Posted 2 years ago

What exactly do you mean by that? From VS Code I execute the following script, and then the agents take care of executing the code remotely:
` import pandas as pd

from clearml import Task, TaskTypes
from clearml.automation.controller import PipelineDecorator

CACHE = False

@PipelineDecorator.component(
name="Wind data creator",
return_values=["wind_series"],
cache=CACHE,
execution_queue="data_cpu",
task_type=TaskTypes.data_processing,
)
def generate_wind(start_date: str, end_date: str) -> pd.Series:

import numpy as np
import pandas as pd

samples_dates = pd.date_range(start=start_date, end=end_date, freq="10T")

rng = np.random.default_rng()
wind_data = rng.weibull(2, len(samples_dates))

return pd.Series(
    data=wind_data,
    index=samples_dates,
    name="wind_series-(m/s)",
)

@PipelineDecorator.component(
name="Wind data parser",
return_values=["wind_series_parsed"],
cache=CACHE,
execution_queue="data_cpu",
task_type=TaskTypes.data_processing,
packages=["pandas"],
)
def parse_wind(wind_series: pd.Series) -> pd.Series:
# Cleaning process
wind_series = wind_series.dropna()
# Resampling process
wind_series = wind_series.resample("1H", label="right").mean()

return wind_series

@PipelineDecorator.component(
name="Dummy forecaster",
return_values=["forecast_series"],
cache=CACHE,
execution_queue="inference",
task_type=TaskTypes.inference,
)
def forecast_with_persistence(
wind_series: pd.Series, base_time: str, horizons: int
) -> pd.Series:

import numpy as np
import pandas as pd

base_time = pd.to_datetime(base_time)

if base_time not in wind_series.index:
    # Find the closest previous date of the wind series to 'base_time'
    nearest_floor_index = wind_series.index.get_loc(
        base_time, method="ffill", tolerance=None
    )
    base_time = wind_series.index[nearest_floor_index]

print("Persistence forecast has been made from", base_time)

return pd.Series(
    data=np.repeat(wind_series.loc[base_time], horizons),
    index=pd.date_range(start=base_time, periods=int(horizons), freq="10T"),
    name="forecast",
)

@PipelineDecorator.pipeline(
name="Prediction Service (deployed)",
project="Mocks",
version="1.0.0",
pipeline_execution_queue="controllers",
multi_instance_support=True,
add_pipeline_tags=True,
)
def prediction_service(config: dict):

logger = Task.current_task().logger

logger.report_text(f"Running step {generate_wind.__name__!r}")
raw_series = generate_wind(start_date=config["start"], end_date=config["end"])

logger.report_text(f"Running step {parse_wind.__name__!r}")
parsed_series = parse_wind(raw_series)

logger.report_text(f"Running step {forecast_with_persistence.__name__!r}")
forecast_values = forecast_with_persistence(
    parsed_series,
    base_time=config["forecast_base_time"],
    horizons=config["horizons"],
)

logger.report_text("The predictions are already cooked!")
print(forecast_values)

if name == "main":

# PipelineDecorator.run_locally()

default_config = {"start": "", "end": "", "forecast_base_time": "", "horizons": 72}

date_configs = [
    ("2021-02-01 00:00", "2021-02-25 23:00", "2021-02-17 17:16"),
    ("2021-11-01 00:00", "2021-11-30 23:00", "2021-11-09 19:09"),
]

for start_date, end_date, base_date in date_configs:

    default_config["start"] = start_date
    default_config["end"] = end_date
    default_config["forecast_base_time"] = base_date

    # Run wind prediction service
    prediction_service(config=default_config) `
  
  
Posted 2 years ago

AgitatedDove14 The pipelines are executed by the agents that are listening to the queue given by pipeline_execution_queue="controllers"

  
  
Posted 2 years ago

GiganticTurtle0 this is exactly what I did, and ended up with two pipelines, comparing them produced what I expected (different arguments as passed by the script).
What are you getting ?

  
  
Posted 2 years ago

AgitatedDove14 So did you get the same results without step duplication?

  
  
Posted 2 years ago

Hey AgitatedDove14 ! Any news on this? 🙂

  
  
Posted 2 years ago

Okay! I'll keep an eye out for updates.

  
  
Posted 2 years ago

Great AgitatedDove14 , I tested it on the mock example and it worked just as I expected 🎉

  
  
Posted 2 years ago

AgitatedDove14 Exactly, I've run into the same problem

  
  
Posted 2 years ago

GiganticTurtle0 fix was pushed 🙂
you can test with:
pip install git+🤞

  
  
Posted 2 years ago

AgitatedDove14 By adding PipelineDecorator.run_locally() everything seems to work perfectly. This is what I expect the experiment listing to look like when the agents are the ones running the code. With this, I'm pretty sure the error search can be narrowed down to the agents' code.

  
  
Posted 2 years ago

AgitatedDove14 After checking, I discovered that apparently it doesn't matter if each pipeline is executed by a different worker, the error persists. Honestly this has me puzzled. I'm really looking forward to getting this functionality right because it's an aspect that would make ClearML shine even more.

  
  
Posted 2 years ago

GiganticTurtle0 I think I located the issue:
it seems the change is in "config" (and for some reason it stores the entire dict) but the split values are not changed.
Is this it?

  
  
Posted 2 years ago

Okay I'll dig into it 🙂

  
  
Posted 2 years ago

Right! I just noticed that! this is odd... and yes defiantly has something to do with the multi pipeline executed on the agent, I think I know what to look for ...
(just making sure (again), running_locally produced exactly what we were expecting, is that correct?)

  
  
Posted 2 years ago

That's right! run_locally() does just what I was expecting

  
  
Posted 2 years ago

okay, let me see if I can nail down the issue

  
  
Posted 2 years ago

Quick update, I found the issue, working on a fix 🙂

  
  
Posted 2 years ago

Yey!

  
  
Posted 2 years ago

but does that mean I have to unpack all the dictionary values as parameters of the pipeline function?

I was just suggesting a hack 🙂 the fix itself is transparent (I'm expecting it to be pushed tomorrow), basically it will make sure the sample pipeline will work as expected.
regardless and out of curiosity, if you only have one dict passed to the pipeline function, why not use named arguments ?

  
  
Posted 2 years ago

AgitatedDove14 I have the strong feeling it must be an agent issue, because when I place PipelineDecorator.run_locally() before calling the pipeline, everything works perfectly. See:

  
  
Posted 2 years ago

AgitatedDove14 I ended up with two pipelines being executed until they completed the workflow but duplicating each of their steps. You can check it here:
https://clearml.slack.com/files/U02A5DGPMPU/F02SR3G9RDK/image.png

  
  
Posted 2 years ago
588 Views
30 Answers
2 years ago
one year ago
Tags