Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
1St: Is It Possible To Make A Pipeline Component Call Another Pipeline Component (As A Substep)? Or Only The Controller Can Do It? 2Nd: I Am Trying To Call A Function Defined In The Same Script, But Unable To Import It. I Passing The Repo Parameter To The

1st: is it possible to make a pipeline component call another pipeline component (as a substep)? Or only the controller can do it?
2nd: I am trying to call a function defined in the same script, but unable to import it. I passing the repo parameter to the component decorator, but no change, it always comes back with "No module named <module>" after my from module import function call, as if working on the top repository directory.

  
  
Posted 2 years ago
Votes Newest

Answers 15


It is an instance of a custom class.

  
  
Posted 2 years ago

Apparently the error comes when I try to access from

get_model_and_features

the pipeline component

load_model

. If it is not set as pipeline component and only as helper function (provided it is declared before the components that calls it (I already understood that and fixed, different from the code I sent above).

ShallowGoldfish8 so now I'm a bit confused, are you saying that now it works as expected ?

  
  
Posted 2 years ago

` import importlib
import argparse

from datetime import datetime
import pandas as pd

from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes, Task

@PipelineDecorator.component(
return_values=['model', 'features_to_build']
)
def get_model_and_features(task_id, model_type):
from clearml import Task
import sys
sys.path.insert(0,'/home/zanini/repo/RecSys')
from src.dataset.backtest import load_model

task = Task.get_task(task_id=task_id)
if 'features' in task.artifacts:
    features_to_build = task.artifacts['features'].get()
else:
    features_key = [
        art for art in task.artifacts.keys()
        if art.startswith('feature importances')
    ][0]
    features_to_build = task.artifacts[features_key].get().index.to_list()

model = task.get_models()['output'][0]
model_path = model.get_local_copy()

model = load_model(model_path, model_type)

return model, features_to_build

@PipelineDecorator.component(return_values=['recbuy'])
def backtest_product(
model, dat_begin, dat_end, features_to_build=None
):
import pandas as pd
import sys
sys.path.insert(0,<hardcoded absolute path to my local repository>)
from src.dataset.data_load import (
DATE_COL, FeaturesOrders, DataConditioner, LEVELS_DICT
)

if features_to_build is not None:
    orders = DataConditioner().condition_orders(
        dat_begin=dat_begin-pd.DateOffset(months=6),
        dat_end=dat_end
    )
    feature_builder = FeaturesOrders(orders=orders)
    
dates = list(orders[DATE_COL].dt.date.unique())
dates = [d for d in dates if (d>=dat_begin) and (d<=dat_end)]
dates.sort()

recbuy = []
for d in dates:
    orders_today = orders.loc[
        orders[DATE_COL].dt.date == d    
    ]
    print(d)
    buy = (
        orders_today[
            [
                'store_name',
                'id_product_unifier',
                'order_date',
                'qty_items_at_order'
            ]
        ]
        .assign(buy=lambda df_: df_.qty_items_at_order > 0)
        .assign(order_date=lambda df_: df_.order_date.dt.date)
        .drop(columns='qty_items_at_order')
    )
    if features_to_build is None:
        buy_stores = buy['store_name'].unique()
        features = model.predict_stores(
            buy_stores,
            d,
            ordering_col_alias='pred'
        )
    else:
        features = feature_builder.make_features(
            ref=pd.to_datetime(d),
            max_level='product',
            features_to_build=features_to_build
        )
        features = features.assign(
            order_date=d
        )
        preds = model.predict_proba(
            features[features_to_build]
        )
        
        features['pred'] = [i[0] for i in preds]
        features = features[
            [c for c in features.columns if c not in features_to_build]
        ]
        
    recbuy.append(
        buy.merge(
            features,
            on=['store_name', 'id_product_unifier', 'order_date'],
            how='outer'
        ).sort_values(['order_date', 'store_name', 'pred'], ascending=False).fillna(0)
    )

recbuy = pd.concat(recbuy)        
return recbuy

@PipelineDecorator.component(return_values=['model'])
def load_model(model_path, model_type):
from catboost import CatBoostClassifier
from lightgbm import Booster
print(model_type)

if model_type == 'catboost':
    model = CatBoostClassifier()
    model.load_model(model_path)
elif model_type == 'lightgbm':
    model = Booster(model_file=model_path)
else:
    print(f' model_type is set to {model_type}')        

return model

@PipelineDecorator.component(return_values=['model'])
def load_baseline_model(model_path):
import importlib
import sys
spec = importlib.util.spec_from_file_location("module.name", model_path)
module = importlib.util.module_from_spec(spec)
sys.modules["module.name"] = module
spec.loader.exec_module(module)
model = module.BaselineModel()
return model

@PipelineDecorator.pipeline(name='Backtest', project='RecSys', version='0.0.1')
def run_backtest(dat_begin, dat_end, task_id=None, model_type='catboost', model_path=None):

if task_id:
    model, features_to_build = get_model_and_features(task_id, model_type)
    if model_path:
        model = load_model(model_path, model_type)
else:        
    model = load_baseline_model(model_path)
    features_to_build = None

bt_recbuy = backtest_product(
    model,
    dat_begin,
    dat_end,
    features_to_build=features_to_build
)

return bt_recbuy

if name == 'main':
parser = argparse.ArgumentParser()
parser.add_argument("-b --begin", dest='dat_begin', required=True)
parser.add_argument("-e --end", dest='dat_end', required=True)
parser.add_argument("-t --task", dest='task_id', required=False)
parser.add_argument("--model-type", dest='model_type', required=False)
parser.add_argument("-p --path", dest='model_path', required=False)
parser.add_argument("--output", dest='output_file', required=False)
args = parser.parse_args()

PipelineDecorator.run_locally()

backtest = run_backtest(
    dat_begin=pd.to_datetime(args.dat_begin),
    dat_end=pd.to_datetime(args.dat_end),
    task_id=args.task_id,
    model_type=args.model_type,
    model_path=args.model_path
)

backtest.to_parquet(args.output_file) if args.output_file else print(backtest) `
  
  
Posted 2 years ago

Ohh that cannot be pickled... how would you suggest to store it into a file?

  
  
Posted 2 years ago

The error comes out after the execution of the component backtest_prod

  
  
Posted 2 years ago

1st: is it possible to make a pipeline component call another pipeline component (as a substep)

Should work as long as they are in the same file, you can however launch and wait any Task (see pipelines from tasks)

2nd: I am trying to call a function defined in the same script, but unable to import it. I passing the repo parameter to the component decorator, but no change, it always comes back with "No module named <module>" after my

from module import function

call, as if working on the top repository directory

try adding the function to ' helper_functions '
@PipelineDecorator.component(..., helper_functions=[my_func_here])

  
  
Posted 2 years ago

That's the script that produces the error. You can also observe the struggle with importing the load_model function. (Any tips on best practices to structure the pipeline are also gladly accepted)

  
  
Posted 2 years ago

did manage to get it working, but only by hardcoding the path of the repository using sys.path.append() with absolute repository path on my machine

  
  
Posted 2 years ago

Additionally, I have the following error now:
2022-08-10 19:53:25,366 - clearml.Task - INFO - Waiting to finish uploads 2022-08-10 19:53:36,726 - clearml.Task - INFO - Finished uploading Traceback (most recent call last): File "/home/zanini/repo/RecSys/src/dataset/backtest.py", line 186, in <module> backtest = run_backtest( File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/automation/controller.py", line 3329, in internal_decorator a_pipeline.stop() File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/automation/controller.py", line 944, in stop self.update_execution_plot() File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/automation/controller.py", line 2688, in update_execution_plot self._update_eager_generated_steps() File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/automation/controller.py", line 2734, in _update_eager_generated_steps self._nodes[new_step_node_name] = self.Node(**eager_node_def) TypeError: __init__() got an unexpected keyword argument 'job_id'It does not track back to my code, so not sure how to debug

  
  
Posted 2 years ago

I noticed that when a pipeline step returns an instance of a class, it tries to pickle. I am currently facing the issue with it not being able to pickle the output of the "load_baseline_model" function
Traceback (most recent call last): File "/tmp/tmpqr2zwiom.py", line 37, in <module> task.upload_artifact(name=name, artifact_object=artifact) File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/task.py", line 1877, in upload_artifact return self._artifacts_manager.upload_artifact( File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/binding/artifacts.py", line 696, in upload_artifact pickle.dump(artifact_object, f) _pickle.PicklingError: Can't pickle <class 'module.name.BaselineModel'>: import of module 'module.name' failed

  
  
Posted 2 years ago

but not as a component (using the decorator)

Hmm yes, I think that component calling component as an external component is not supported yet
(basically the difference is , is it actually running as a function, or running on a different machine as another pipeline component)

I noticed that when a pipeline step returns an instance of a class, it tries to pickle.

Yes this is how the serialization works, when we pass data from one node to another (by design it supports multiple machines, so we have to serialize the objects).
Yes we kind of assume you can pickle the object, because as mentioned above the second node / component is running on another machine
Make sense ?

What is the object you are trying to pass?

  
  
Posted 2 years ago

ShallowGoldfish8 how did you get this error?
self.Node(**eager_node_def) TypeError: __init__() got an unexpected keyword argument 'job_id'

  
  
Posted 2 years ago

Apparently the error comes when I try to access from get_model_and_features the pipeline component load_model . If it is not set as pipeline component and only as helper function (provided it is declared before the components that calls it (I already understood that and fixed, different from the code I sent above).

  
  
Posted 2 years ago

Should work as long as they are in the same file, you can however launch and wait any Task (see pipelines from tasks)

Do I call it as a function normally as in the other or do I need to import? (My initial problem was actually that is was not founding the other function as a pipeline component, so I thought it was not able to import as a secondary sub-component)

  
  
Posted 2 years ago

It works if I use as a helper function, but not as a component (using the decorator)

  
  
Posted 2 years ago