` import importlib
import argparse
from datetime import datetime
import pandas as pd
from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes, Task
@PipelineDecorator.component(
return_values=['model', 'features_to_build']
)
def get_model_and_features(task_id, model_type):
from clearml import Task
import sys
sys.path.insert(0,'/home/zanini/repo/RecSys')
from src.dataset.backtest import load_model
task = Task.get_task(task_id=task_id)
if 'features' in task.artifacts:
features_to_build = task.artifacts['features'].get()
else:
features_key = [
art for art in task.artifacts.keys()
if art.startswith('feature importances')
][0]
features_to_build = task.artifacts[features_key].get().index.to_list()
model = task.get_models()['output'][0]
model_path = model.get_local_copy()
model = load_model(model_path, model_type)
return model, features_to_build
@PipelineDecorator.component(return_values=['recbuy'])
def backtest_product(
model, dat_begin, dat_end, features_to_build=None
):
import pandas as pd
import sys
sys.path.insert(0,<hardcoded absolute path to my local repository>)
from src.dataset.data_load import (
DATE_COL, FeaturesOrders, DataConditioner, LEVELS_DICT
)
if features_to_build is not None:
orders = DataConditioner().condition_orders(
dat_begin=dat_begin-pd.DateOffset(months=6),
dat_end=dat_end
)
feature_builder = FeaturesOrders(orders=orders)
dates = list(orders[DATE_COL].dt.date.unique())
dates = [d for d in dates if (d>=dat_begin) and (d<=dat_end)]
dates.sort()
recbuy = []
for d in dates:
orders_today = orders.loc[
orders[DATE_COL].dt.date == d
]
print(d)
buy = (
orders_today[
[
'store_name',
'id_product_unifier',
'order_date',
'qty_items_at_order'
]
]
.assign(buy=lambda df_: df_.qty_items_at_order > 0)
.assign(order_date=lambda df_: df_.order_date.dt.date)
.drop(columns='qty_items_at_order')
)
if features_to_build is None:
buy_stores = buy['store_name'].unique()
features = model.predict_stores(
buy_stores,
d,
ordering_col_alias='pred'
)
else:
features = feature_builder.make_features(
ref=pd.to_datetime(d),
max_level='product',
features_to_build=features_to_build
)
features = features.assign(
order_date=d
)
preds = model.predict_proba(
features[features_to_build]
)
features['pred'] = [i[0] for i in preds]
features = features[
[c for c in features.columns if c not in features_to_build]
]
recbuy.append(
buy.merge(
features,
on=['store_name', 'id_product_unifier', 'order_date'],
how='outer'
).sort_values(['order_date', 'store_name', 'pred'], ascending=False).fillna(0)
)
recbuy = pd.concat(recbuy)
return recbuy
@PipelineDecorator.component(return_values=['model'])
def load_model(model_path, model_type):
from catboost import CatBoostClassifier
from lightgbm import Booster
print(model_type)
if model_type == 'catboost':
model = CatBoostClassifier()
model.load_model(model_path)
elif model_type == 'lightgbm':
model = Booster(model_file=model_path)
else:
print(f' model_type is set to {model_type}')
return model
@PipelineDecorator.component(return_values=['model'])
def load_baseline_model(model_path):
import importlib
import sys
spec = importlib.util.spec_from_file_location("module.name", model_path)
module = importlib.util.module_from_spec(spec)
sys.modules["module.name"] = module
spec.loader.exec_module(module)
model = module.BaselineModel()
return model
@PipelineDecorator.pipeline(name='Backtest', project='RecSys', version='0.0.1')
def run_backtest(dat_begin, dat_end, task_id=None, model_type='catboost', model_path=None):
if task_id:
model, features_to_build = get_model_and_features(task_id, model_type)
if model_path:
model = load_model(model_path, model_type)
else:
model = load_baseline_model(model_path)
features_to_build = None
bt_recbuy = backtest_product(
model,
dat_begin,
dat_end,
features_to_build=features_to_build
)
return bt_recbuy
if name == 'main':
parser = argparse.ArgumentParser()
parser.add_argument("-b --begin", dest='dat_begin', required=True)
parser.add_argument("-e --end", dest='dat_end', required=True)
parser.add_argument("-t --task", dest='task_id', required=False)
parser.add_argument("--model-type", dest='model_type', required=False)
parser.add_argument("-p --path", dest='model_path', required=False)
parser.add_argument("--output", dest='output_file', required=False)
args = parser.parse_args()
PipelineDecorator.run_locally()
backtest = run_backtest(
dat_begin=pd.to_datetime(args.dat_begin),
dat_end=pd.to_datetime(args.dat_end),
task_id=args.task_id,
model_type=args.model_type,
model_path=args.model_path
)
backtest.to_parquet(args.output_file) if args.output_file else print(backtest) `