Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi Everyone, I Have Questions Related To Clearml-Serving.

Hi everyone, I have questions related to clearml-serving.
When run clearml-serving we mentioned the input shape and output shape in the argument. In my use case, I have timeseries dataset with dimension 1,60,1 which the first dimension is number of data, the second one is timestep, and the third is number of column used. I want to make the first dimension dynamic so I can input multiple data to the API. Is there any solution from clearml-serving for my problem? Is there any references (vlog/blog) on deploying real-time model and do the continuous training pipeline in clear-ml?Thanks.

  
  
Posted 2 years ago
Votes Newest

Answers 23


Is there any references (vlog/blog) on deploying real-time model and do the continuous training pipeline in clear-ml?

Something along the lines of this one ?
https://clear.ml/blog/creating-a-fully-automatic-retraining-loop-using-clearml-data/
Or this one?
https://www.youtube.com/watch?v=uNB6FKIi8Wg

  
  
Posted 2 years ago

I have timeseries dataset with dimension 1,60,1 which the first dimension is number of data, the second one is timestep

I think it should be --input-size 1 60 ` if the last dimension is the batch size?
(BTW: this goes directly to Triton configuration, it is the information Triton needs in order to run the model itself)

  
  
Posted 2 years ago

EmbarrassedPeacock82 are you using keras/pytorch etc for serving (i.e. Triton) ?

  
  
Posted 2 years ago

It said the command --aux-config got invalid input

  
  
Posted 2 years ago

ZanyPelican5 Yes, I am using keras for serving (triton)

  
  
Posted 2 years ago

` from multiprocessing.sharedctypes import Value
from typing import Any

import numpy as np
import pandas as pd
from clearml import Task
import pickle
from sklearn.linear_model import LinearRegression, RANSACRegressor

Notice Preprocess class Must be named "Preprocess"

class Preprocess(object):
def init(self):
# set internal state, this will be called only once. (i.e. not per request)
self.task = Task.get_task(project_name='serving examples', task_id='bfc1ae4d242b4d5a9955adde1c9e5a58')
if self.task.artifacts.get('seasonality').size is not None:
self.seasonality = eval(self.task.artifacts.get('seasonality').preview)
self.z_score = eval(self.task.artifacts.get('z_score').preview)
else:
self.z_score = None

    self.threshold = eval(self.task.artifacts.get('threshold').preview)
    self.timesteps = eval(self.task.artifacts.get('time_steps').preview)
    self.trend_step = eval(self.task.artifacts.get('trend_step').preview)

    regressor_path = self.task.artifacts['regressor'].get_local_copy()
    file_regressor = open(regressor_path, 'rb')
    self.regressor = pickle.load(file_regressor)
    pass

def z_score_normalization(self, df,seasonality, normal_z_score=None):
    indexer = 0
    if seasonality == 'hourly':
        season = df.groupby(df.index.hour)
    if seasonality == 'daily':
        season = df.groupby(df.index.dayofweek)
    if seasonality == 'weekly':
        season = df.groupby(df.index.isocalendar().week)
        indexer = 1
    if seasonality == 'monthly':
        season = df.groupby(df.index.month)
        indexer = 1

    if normal_z_score is not None:
        for index, time in season:
            index -= indexer
            df.loc[time.index, 'value'] = (time['value'] - normal_z_score[index]['mean'])/normal_z_score[index]['std']
        return df
    else:  
        z_score = []
        for _, time in season:
            mean = np.mean(time['value'])
            std = np.std(time['value'])
            df.loc[time.index, 'value'] = (time['value'] - mean)/std
            z_score.append({'mean': mean, 'std': std})
        return df, z_score

def create_sequences(self, X, y):
    Xs, ys = [], []
    for i in range(len(X)-self.timesteps):
        Xs.append(X.iloc[i:(i+self.timesteps)].values)
        ys.append(y.iloc[i+self.timesteps])
    
    return np.array(Xs), np.array(ys)

def preprocess(self, body: dict, state: dict, collect_custom_statistics_fn=None) -> Any:
    df = pd.DataFrame({'date': body.get("date"), 'value': body.get("value")}, columns=['date','value'])
    
    df['date'] = pd.to_datetime(df['date'], dayfirst = True)
    df = df.set_index('date')

    df = df.resample('1T').mean()
    # detrend regression here
    y_test_value = df['value'].values
    X_test_value = [i + self.trend_step for i in range(0, len(df['value']))]
    X_test_value = np.reshape(X_test_value, (len(X_test_value), 1))
    # X_test_poly = preprocessing.get_polynomial_features(X_test_value)
    trendp = self.regressor.predict(X_test_value)
    detrended = [y_test_value[i]-trendp[i] for i in range(0, len(df['value']))]
    df['value'] = detrended
    self.task.upload_artifact('trend_step', self.trend_step + 1)
            
    if self.z_score is not None:
        df = self.z_score_normalization(df, self.seasonality, self.z_score)

    X, _ = self.create_sequences(df[['value']],  df['value'])
    state['X'] = X
    state['df_test'] = df
    return X

def postprocess(self, data: Any, state: dict, collect_custom_statistics_fn=None) -> dict:
    # post process the data returned from the model inference engine
    # data is the return value from model.predict we will put is inside a return value as Y
    
    X_test = state['X']
    df_test = state['df_test']
    
    test_mae_loss = np.mean(np.abs(data-X_test), axis=1)
    test_score_df = pd.DataFrame(df_test[self.timesteps:])
    test_score_df['loss'] = test_mae_loss
    test_score_df['threshold'] = self.threshold
    test_score_df['anomaly'] = test_score_df['loss'] > test_score_df['threshold']
    test_score_df['Close'] = df_test[self.timesteps:]['value']

    return test_score_df.to_dict() `
  
  
Posted 2 years ago

the trend step artifact used to keep track the time of the data so we know the expected trend of the input data. For example, on the first data which is trend_step = 1 the trend value is 10, then if the trend_step = 10 (the tenth data) our regressor will predict the trend value of the selected trend_step. this method is still in research to make it more efficient so it doesn't need to upload artifact every request

X is the sequence generated from df. df contains 2 columns (date and value). Size of X for this example is (1,60,1) with type np.array, X is sequence with size 1(number of data), 60(timestep), 1(value from 'value' column of df).

  
  
Posted 2 years ago

self.task.upload_artifact('trend_step', self.trend_step + 1)Out of curiosity why would every request generate an artifact ? Wouldn't it be better to report as part of the statistics ?
What would be the size / type of the matrix X (i.e. np.size / np.dtype) ?

  
  
Posted 2 years ago

the last dimension is the value, like if the time series is univariate data the last dimension is 1 and if multivariate data the last dimension is depend on the number of datas feed to the model

  
  
Posted 2 years ago

Hmm, how does your preprocessing code looks like?

  
  
Posted 2 years ago

the trend step artifact used to keep track the time of the data so we know the expected trend of the input data. For example, on the first data which is trend_step = 1 the trend value is 10, then if the trend_step = 10 (the tenth data) our regressor will predict the trend value of the selected trend_step. this method is still in research to make it more efficient so it doesn't need to upload artifact every request

Make sense! I would suggest you add a GitHub issue with feature request for fast key/value storage that supports multi instance, I think this usage pattern will be greatly appreciated
BTW as an optimization I would use Task scalars (they are send in the background, you can relativity easily get the latest value etc.) do you also need it to be atomic ?

X is the sequence generated from df. df contains 2 columns (date and value). Size of X for this example is (1,60,1) with type np.array, X is sequence with size 1(number of data), 60(timestep), 1(value from 'value' column of df).

So in theory 1,60,-1 should work as a size for Triton, Are you getting an error?
(BTW: if you were to manually run the model inference I'm assuming you would have created a 3d matrix where the dims are 1,60,<batch_size>, is that correct?)

  
  
Posted 2 years ago

I think this usage pattern will be greatly appreciated
BTW as an optimization I would use Task scalars (they are send in the background, you can relativity easily get the latest value etc.) do you also need it to be atomic ?

Yes it's atomic, Okay I will research more about the Task scalars

So in theory 1,60,-1 should work as a size for Triton, Are you getting an error?
(BTW: if you were to manually run the model inference I'm assuming you would have created a 3d matrix where the dims are 1,60,<batch_size>, is that correct?

Unfortunately it's not working, in (1,60,1) dimension I think there is no batch size information there. The input data shape is like this:
[[1,...,60]]. I need the first dimension to be dynamic so I can send more time series data at once: [[1,..,60], [61,..,120], [121,..,180]] <- like that

  
  
Posted 2 years ago

It said the command --aux-config got invalid input

This seems like an interface bug.. let me see if we can fix that 🙂

BTW: this seems like a triton LSTM configuration issue, we might want to move the discussion to the Triton server issue, wdyt?

Definitely!

Could you start an issue https://github.com/triton-inference-server/server/issues , and I'll jump join the conversation?

. Is there any reference about integrating kafka data streaming directly to clearml-serving?

There is already a kafka server in clearml-serving, I think I'm lost on what you are looking to build here?

I'm using a nodejs service to mediate data from kafka to my clearml-serving

Is this for the serving of requests? (following my question above)

  
  
Posted 2 years ago

I have tried to change dimension to [-1,60,1] but I got this error:

{"detail":"Error processing request: <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.INVALID_ARGUMENT\n\tdetails = "All input dimensions should be specified for input 'lstm_input' for model 'MTkyLjE2OC4wLjEwMXxTaXRlc2NvcGV8dXRpbGl6YXRpb258Q1BVfFBWRTEzUDEwMQ', got [-1,60,1]"\n\tdebug_error_string = "{"created":"@1660528548.049544414","description":"Error received from peer ipv4:172.18.0.7:8001","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"All input dimensions should be specified for input 'lstm_input' for model 'MTkyLjE2OC4wLjEwMXxTaXRlc2NvcGV8dXRpbGl6YXRpb258Q1BVfFBWRTEzUDEwMQ', got [-1,60,1]","grpc_status":3}"\n>"}

  
  
Posted 2 years ago

BTW: this seems like a triton LSTM configuration issue, we might want to move the discussion to the Triton server issue, wdyt?

Definitely!
Also, i'd like to ask about the alternative way of this issue. Is there any reference about integrating kafka data streaming directly to clearml-serving? because right now this issue raise as I'm using a nodejs service to mediate data from kafka to my clearml-serving

  
  
Posted 2 years ago

Hmm EmbarrassedPeacock82
Let's try with
--input-size -1 60 1 --aux-config input.format=FORMAT_NCHWBTW: this seems like a triton LSTM configuration issue, we might want to move the discussion to the Triton server issue, wdyt?

  
  
Posted 2 years ago

Oh, so the way it currently works clearml-serving will push the data in real-time into Prometheus (you can control the stats/input/out), then you can build the anomaly detection in grafana (for example alerts on histograms over time is out-of-the-box, and clearml creates the histograms overtime).
Would you also need access to the stats data in Prometheus ? or are you saying you need to process it before it gets there?

Is there any references to setup this? I'm not familiar about advanced features in clearml-serving

  
  
Posted 2 years ago

I want to build a real time data streaming anomaly detection service with clearml-serving

Oh, so the way it currently works clearml-serving will push the data in real-time into Prometheus (you can control the stats/input/out), then you can build the anomaly detection in grafana (for example alerts on histograms over time is out-of-the-box, and clearml creates the histograms overtime).
Would you also need access to the stats data in Prometheus ? or are you saying you need to process it before it gets there?

  
  
Posted 2 years ago

There is already a kafka server in clearml-serving, I think I'm lost on what you are looking to build here?

I want to build a real time data streaming anomaly detection service with clearml-serving

  
  
Posted 2 years ago

Okay, I will research about the features first. Thank you AgitatedDove14 for the support. If there is new issue will let you know in the new thread

  
  
Posted 2 years ago

If there is new issue will let you know in the new thread

Thanks! I would really like to understand what is the correct configuration

  
  
Posted 2 years ago

So I would try: [-1, 60, 1]

  
  
Posted 2 years ago