but not as a component (using the decorator)
Hmm yes, I think that component calling component as an external component is not supported yet
(basically the difference is , is it actually running as a function, or running on a different machine as another pipeline component)
I noticed that when a pipeline step returns an instance of a class, it tries to pickle.
Yes this is how the serialization works, when we pass data from one node to another (by design it supports multiple machines, so we have to serialize the objects).
Yes we kind of assume you can pickle the object, because as mentioned above the second node / component is running on another machine
Make sense ?
What is the object you are trying to pass?
1st: is it possible to make a pipeline component call another pipeline component (as a substep)
Should work as long as they are in the same file, you can however launch and wait any Task (see pipelines from tasks)
2nd: I am trying to call a function defined in the same script, but unable to import it. I passing the repo parameter to the component decorator, but no change, it always comes back with "No module named <module>" after my
from module import function
call, as if working on the top repository directory
try adding the function to ' helper_functions
'@PipelineDecorator.component(..., helper_functions=[my_func_here])
did manage to get it working, but only by hardcoding the path of the repository using sys.path.append()
with absolute repository path on my machine
Apparently the error comes when I try to access from
get_model_and_features
the pipeline component
load_model
. If it is not set as pipeline component and only as helper function (provided it is declared before the components that calls it (I already understood that and fixed, different from the code I sent above).
ShallowGoldfish8 so now I'm a bit confused, are you saying that now it works as expected ?
The error comes out after the execution of the component backtest_prod
ShallowGoldfish8 how did you get this error?self.Node(**eager_node_def) TypeError: __init__() got an unexpected keyword argument 'job_id'
It works if I use as a helper function, but not as a component (using the decorator)
That's the script that produces the error. You can also observe the struggle with importing the load_model function. (Any tips on best practices to structure the pipeline are also gladly accepted)
Ohh that cannot be pickled... how would you suggest to store it into a file?
I noticed that when a pipeline step returns an instance of a class, it tries to pickle. I am currently facing the issue with it not being able to pickle the output of the "load_baseline_model" functionTraceback (most recent call last): File "/tmp/tmpqr2zwiom.py", line 37, in <module> task.upload_artifact(name=name, artifact_object=artifact) File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/task.py", line 1877, in upload_artifact return self._artifacts_manager.upload_artifact( File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/binding/artifacts.py", line 696, in upload_artifact pickle.dump(artifact_object, f) _pickle.PicklingError: Can't pickle <class 'module.name.BaselineModel'>: import of module 'module.name' failed
Should work as long as they are in the same file, you can however launch and wait any Task (see pipelines from tasks)
Do I call it as a function normally as in the other or do I need to import? (My initial problem was actually that is was not founding the other function as a pipeline component, so I thought it was not able to import as a secondary sub-component)
` import importlib
import argparse
from datetime import datetime
import pandas as pd
from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes, Task
@PipelineDecorator.component(
return_values=['model', 'features_to_build']
)
def get_model_and_features(task_id, model_type):
from clearml import Task
import sys
sys.path.insert(0,'/home/zanini/repo/RecSys')
from src.dataset.backtest import load_model
task = Task.get_task(task_id=task_id)
if 'features' in task.artifacts:
features_to_build = task.artifacts['features'].get()
else:
features_key = [
art for art in task.artifacts.keys()
if art.startswith('feature importances')
][0]
features_to_build = task.artifacts[features_key].get().index.to_list()
model = task.get_models()['output'][0]
model_path = model.get_local_copy()
model = load_model(model_path, model_type)
return model, features_to_build
@PipelineDecorator.component(return_values=['recbuy'])
def backtest_product(
model, dat_begin, dat_end, features_to_build=None
):
import pandas as pd
import sys
sys.path.insert(0,<hardcoded absolute path to my local repository>)
from src.dataset.data_load import (
DATE_COL, FeaturesOrders, DataConditioner, LEVELS_DICT
)
if features_to_build is not None:
orders = DataConditioner().condition_orders(
dat_begin=dat_begin-pd.DateOffset(months=6),
dat_end=dat_end
)
feature_builder = FeaturesOrders(orders=orders)
dates = list(orders[DATE_COL].dt.date.unique())
dates = [d for d in dates if (d>=dat_begin) and (d<=dat_end)]
dates.sort()
recbuy = []
for d in dates:
orders_today = orders.loc[
orders[DATE_COL].dt.date == d
]
print(d)
buy = (
orders_today[
[
'store_name',
'id_product_unifier',
'order_date',
'qty_items_at_order'
]
]
.assign(buy=lambda df_: df_.qty_items_at_order > 0)
.assign(order_date=lambda df_: df_.order_date.dt.date)
.drop(columns='qty_items_at_order')
)
if features_to_build is None:
buy_stores = buy['store_name'].unique()
features = model.predict_stores(
buy_stores,
d,
ordering_col_alias='pred'
)
else:
features = feature_builder.make_features(
ref=pd.to_datetime(d),
max_level='product',
features_to_build=features_to_build
)
features = features.assign(
order_date=d
)
preds = model.predict_proba(
features[features_to_build]
)
features['pred'] = [i[0] for i in preds]
features = features[
[c for c in features.columns if c not in features_to_build]
]
recbuy.append(
buy.merge(
features,
on=['store_name', 'id_product_unifier', 'order_date'],
how='outer'
).sort_values(['order_date', 'store_name', 'pred'], ascending=False).fillna(0)
)
recbuy = pd.concat(recbuy)
return recbuy
@PipelineDecorator.component(return_values=['model'])
def load_model(model_path, model_type):
from catboost import CatBoostClassifier
from lightgbm import Booster
print(model_type)
if model_type == 'catboost':
model = CatBoostClassifier()
model.load_model(model_path)
elif model_type == 'lightgbm':
model = Booster(model_file=model_path)
else:
print(f' model_type is set to {model_type}')
return model
@PipelineDecorator.component(return_values=['model'])
def load_baseline_model(model_path):
import importlib
import sys
spec = importlib.util.spec_from_file_location("module.name", model_path)
module = importlib.util.module_from_spec(spec)
sys.modules["module.name"] = module
spec.loader.exec_module(module)
model = module.BaselineModel()
return model
@PipelineDecorator.pipeline(name='Backtest', project='RecSys', version='0.0.1')
def run_backtest(dat_begin, dat_end, task_id=None, model_type='catboost', model_path=None):
if task_id:
model, features_to_build = get_model_and_features(task_id, model_type)
if model_path:
model = load_model(model_path, model_type)
else:
model = load_baseline_model(model_path)
features_to_build = None
bt_recbuy = backtest_product(
model,
dat_begin,
dat_end,
features_to_build=features_to_build
)
return bt_recbuy
if name == 'main':
parser = argparse.ArgumentParser()
parser.add_argument("-b --begin", dest='dat_begin', required=True)
parser.add_argument("-e --end", dest='dat_end', required=True)
parser.add_argument("-t --task", dest='task_id', required=False)
parser.add_argument("--model-type", dest='model_type', required=False)
parser.add_argument("-p --path", dest='model_path', required=False)
parser.add_argument("--output", dest='output_file', required=False)
args = parser.parse_args()
PipelineDecorator.run_locally()
backtest = run_backtest(
dat_begin=pd.to_datetime(args.dat_begin),
dat_end=pd.to_datetime(args.dat_end),
task_id=args.task_id,
model_type=args.model_type,
model_path=args.model_path
)
backtest.to_parquet(args.output_file) if args.output_file else print(backtest) `
Additionally, I have the following error now:2022-08-10 19:53:25,366 - clearml.Task - INFO - Waiting to finish uploads 2022-08-10 19:53:36,726 - clearml.Task - INFO - Finished uploading Traceback (most recent call last): File "/home/zanini/repo/RecSys/src/dataset/backtest.py", line 186, in <module> backtest = run_backtest( File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/automation/controller.py", line 3329, in internal_decorator a_pipeline.stop() File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/automation/controller.py", line 944, in stop self.update_execution_plot() File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/automation/controller.py", line 2688, in update_execution_plot self._update_eager_generated_steps() File "/home/zanini/repo/RecSys/.venv/lib/python3.9/site-packages/clearml/automation/controller.py", line 2734, in _update_eager_generated_steps self._nodes[new_step_node_name] = self.Node(**eager_node_def) TypeError: __init__() got an unexpected keyword argument 'job_id'
It does not track back to my code, so not sure how to debug
Apparently the error comes when I try to access from get_model_and_features
the pipeline component load_model
. If it is not set as pipeline component and only as helper function (provided it is declared before the components that calls it (I already understood that and fixed, different from the code I sent above).