Is there any references (vlog/blog) on deploying real-time model and do the continuous training pipeline in clear-ml?
Something along the lines of this one ?
https://clear.ml/blog/creating-a-fully-automatic-retraining-loop-using-clearml-data/
Or this one?
https://www.youtube.com/watch?v=uNB6FKIi8Wg
I have timeseries dataset with dimension 1,60,1 which the first dimension is number of data, the second one is timestep
I think it should be --input-size 1 60 ` if the last dimension is the batch size?
(BTW: this goes directly to Triton configuration, it is the information Triton needs in order to run the model itself)
EmbarrassedPeacock82 are you using keras/pytorch etc for serving (i.e. Triton) ?
It said the command --aux-config got invalid input
ZanyPelican5 Yes, I am using keras for serving (triton)
` from multiprocessing.sharedctypes import Value
from typing import Any
import numpy as np
import pandas as pd
from clearml import Task
import pickle
from sklearn.linear_model import LinearRegression, RANSACRegressor
Notice Preprocess class Must be named "Preprocess"
class Preprocess(object):
def init(self):
# set internal state, this will be called only once. (i.e. not per request)
self.task = Task.get_task(project_name='serving examples', task_id='bfc1ae4d242b4d5a9955adde1c9e5a58')
if self.task.artifacts.get('seasonality').size is not None:
self.seasonality = eval(self.task.artifacts.get('seasonality').preview)
self.z_score = eval(self.task.artifacts.get('z_score').preview)
else:
self.z_score = None
self.threshold = eval(self.task.artifacts.get('threshold').preview)
self.timesteps = eval(self.task.artifacts.get('time_steps').preview)
self.trend_step = eval(self.task.artifacts.get('trend_step').preview)
regressor_path = self.task.artifacts['regressor'].get_local_copy()
file_regressor = open(regressor_path, 'rb')
self.regressor = pickle.load(file_regressor)
pass
def z_score_normalization(self, df,seasonality, normal_z_score=None):
indexer = 0
if seasonality == 'hourly':
season = df.groupby(df.index.hour)
if seasonality == 'daily':
season = df.groupby(df.index.dayofweek)
if seasonality == 'weekly':
season = df.groupby(df.index.isocalendar().week)
indexer = 1
if seasonality == 'monthly':
season = df.groupby(df.index.month)
indexer = 1
if normal_z_score is not None:
for index, time in season:
index -= indexer
df.loc[time.index, 'value'] = (time['value'] - normal_z_score[index]['mean'])/normal_z_score[index]['std']
return df
else:
z_score = []
for _, time in season:
mean = np.mean(time['value'])
std = np.std(time['value'])
df.loc[time.index, 'value'] = (time['value'] - mean)/std
z_score.append({'mean': mean, 'std': std})
return df, z_score
def create_sequences(self, X, y):
Xs, ys = [], []
for i in range(len(X)-self.timesteps):
Xs.append(X.iloc[i:(i+self.timesteps)].values)
ys.append(y.iloc[i+self.timesteps])
return np.array(Xs), np.array(ys)
def preprocess(self, body: dict, state: dict, collect_custom_statistics_fn=None) -> Any:
df = pd.DataFrame({'date': body.get("date"), 'value': body.get("value")}, columns=['date','value'])
df['date'] = pd.to_datetime(df['date'], dayfirst = True)
df = df.set_index('date')
df = df.resample('1T').mean()
# detrend regression here
y_test_value = df['value'].values
X_test_value = [i + self.trend_step for i in range(0, len(df['value']))]
X_test_value = np.reshape(X_test_value, (len(X_test_value), 1))
# X_test_poly = preprocessing.get_polynomial_features(X_test_value)
trendp = self.regressor.predict(X_test_value)
detrended = [y_test_value[i]-trendp[i] for i in range(0, len(df['value']))]
df['value'] = detrended
self.task.upload_artifact('trend_step', self.trend_step + 1)
if self.z_score is not None:
df = self.z_score_normalization(df, self.seasonality, self.z_score)
X, _ = self.create_sequences(df[['value']], df['value'])
state['X'] = X
state['df_test'] = df
return X
def postprocess(self, data: Any, state: dict, collect_custom_statistics_fn=None) -> dict:
# post process the data returned from the model inference engine
# data is the return value from model.predict we will put is inside a return value as Y
X_test = state['X']
df_test = state['df_test']
test_mae_loss = np.mean(np.abs(data-X_test), axis=1)
test_score_df = pd.DataFrame(df_test[self.timesteps:])
test_score_df['loss'] = test_mae_loss
test_score_df['threshold'] = self.threshold
test_score_df['anomaly'] = test_score_df['loss'] > test_score_df['threshold']
test_score_df['Close'] = df_test[self.timesteps:]['value']
return test_score_df.to_dict() `
the trend step artifact used to keep track the time of the data so we know the expected trend of the input data. For example, on the first data which is trend_step = 1 the trend value is 10, then if the trend_step = 10 (the tenth data) our regressor will predict the trend value of the selected trend_step. this method is still in research to make it more efficient so it doesn't need to upload artifact every request
X is the sequence generated from df. df contains 2 columns (date and value). Size of X for this example is (1,60,1) with type np.array, X is sequence with size 1(number of data), 60(timestep), 1(value from 'value' column of df).
self.task.upload_artifact('trend_step', self.trend_step + 1)
Out of curiosity why would every request generate an artifact ? Wouldn't it be better to report as part of the statistics ?
What would be the size / type of the matrix X
(i.e. np.size / np.dtype) ?
the last dimension is the value, like if the time series is univariate data the last dimension is 1 and if multivariate data the last dimension is depend on the number of datas feed to the model
Hmm, how does your preprocessing code looks like?
the trend step artifact used to keep track the time of the data so we know the expected trend of the input data. For example, on the first data which is trend_step = 1 the trend value is 10, then if the trend_step = 10 (the tenth data) our regressor will predict the trend value of the selected trend_step. this method is still in research to make it more efficient so it doesn't need to upload artifact every request
Make sense! I would suggest you add a GitHub issue with feature request for fast key/value storage that supports multi instance, I think this usage pattern will be greatly appreciated
BTW as an optimization I would use Task scalars (they are send in the background, you can relativity easily get the latest value etc.) do you also need it to be atomic ?
X is the sequence generated from df. df contains 2 columns (date and value). Size of X for this example is (1,60,1) with type np.array, X is sequence with size 1(number of data), 60(timestep), 1(value from 'value' column of df).
So in theory 1,60,-1 should work as a size for Triton, Are you getting an error?
(BTW: if you were to manually run the model inference I'm assuming you would have created a 3d matrix where the dims are 1,60,<batch_size>, is that correct?)
I think this usage pattern will be greatly appreciated
BTW as an optimization I would use Task scalars (they are send in the background, you can relativity easily get the latest value etc.) do you also need it to be atomic ?
Yes it's atomic, Okay I will research more about the Task scalars
So in theory 1,60,-1 should work as a size for Triton, Are you getting an error?
(BTW: if you were to manually run the model inference I'm assuming you would have created a 3d matrix where the dims are 1,60,<batch_size>, is that correct?
Unfortunately it's not working, in (1,60,1) dimension I think there is no batch size information there. The input data shape is like this:
[[1,...,60]]. I need the first dimension to be dynamic so I can send more time series data at once: [[1,..,60], [61,..,120], [121,..,180]] <- like that
It said the command --aux-config got invalid input
This seems like an interface bug.. let me see if we can fix that 🙂
BTW: this seems like a triton LSTM configuration issue, we might want to move the discussion to the Triton server issue, wdyt?
Definitely!
Could you start an issue https://github.com/triton-inference-server/server/issues , and I'll jump join the conversation?
. Is there any reference about integrating kafka data streaming directly to clearml-serving?
There is already a kafka server in clearml-serving, I think I'm lost on what you are looking to build here?
I'm using a nodejs service to mediate data from kafka to my clearml-serving
Is this for the serving of requests? (following my question above)
I have tried to change dimension to [-1,60,1] but I got this error:
{"detail":"Error processing request: <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.INVALID_ARGUMENT\n\tdetails = "All input dimensions should be specified for input 'lstm_input' for model 'MTkyLjE2OC4wLjEwMXxTaXRlc2NvcGV8dXRpbGl6YXRpb258Q1BVfFBWRTEzUDEwMQ', got [-1,60,1]"\n\tdebug_error_string = "{"created":"@1660528548.049544414","description":"Error received from peer ipv4:172.18.0.7:8001","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"All input dimensions should be specified for input 'lstm_input' for model 'MTkyLjE2OC4wLjEwMXxTaXRlc2NvcGV8dXRpbGl6YXRpb258Q1BVfFBWRTEzUDEwMQ', got [-1,60,1]","grpc_status":3}"\n>"}
BTW: this seems like a triton LSTM configuration issue, we might want to move the discussion to the Triton server issue, wdyt?
Definitely!
Also, i'd like to ask about the alternative way of this issue. Is there any reference about integrating kafka data streaming directly to clearml-serving? because right now this issue raise as I'm using a nodejs service to mediate data from kafka to my clearml-serving
Hmm EmbarrassedPeacock82
Let's try with--input-size -1 60 1 --aux-config input.format=FORMAT_NCHW
BTW: this seems like a triton LSTM configuration issue, we might want to move the discussion to the Triton server issue, wdyt?
Oh, so the way it currently works clearml-serving will push the data in real-time into Prometheus (you can control the stats/input/out), then you can build the anomaly detection in grafana (for example alerts on histograms over time is out-of-the-box, and clearml creates the histograms overtime).
Would you also need access to the stats data in Prometheus ? or are you saying you need to process it before it gets there?
Is there any references to setup this? I'm not familiar about advanced features in clearml-serving
I want to build a real time data streaming anomaly detection service with clearml-serving
Oh, so the way it currently works clearml-serving will push the data in real-time into Prometheus (you can control the stats/input/out), then you can build the anomaly detection in grafana (for example alerts on histograms over time is out-of-the-box, and clearml creates the histograms overtime).
Would you also need access to the stats data in Prometheus ? or are you saying you need to process it before it gets there?
There is already a kafka server in clearml-serving, I think I'm lost on what you are looking to build here?
I want to build a real time data streaming anomaly detection service with clearml-serving
Okay, I will research about the features first. Thank you AgitatedDove14 for the support. If there is new issue will let you know in the new thread
If there is new issue will let you know in the new thread
Thanks! I would really like to understand what is the correct configuration