Scaling ML Models using AWS SageMaker Multi-Model Endpoints

34 minute read

Scaling Machine Learning Models using AWS SageMaker Multi-Model Endpoint

AWS SageMaker allows you to deploy hundereds of models behind one endpoint. This is a very cost-effective solution than hosting 1 endpoint per model.

Businesses are increasingly developing per-user machine learning (ML) models instead of cohort or segment-based models. They train anywhere from hundreds to hundreds of thousands of custom models based on individual user data. For example, a music streaming service trains custom models based on each listener’s music history to personalize music recommendations. A taxi service trains custom models based on each city’s traffic patterns to predict rider wait times.

While the benefit of building custom ML models for each use case is higher inference accuracy, the downside is that the cost of deploying models increases significantly, and it becomes difficult to manage so many models in production. These challenges become more pronounced when you don’t access all models at the same time but still need them to be available at all times. Amazon SageMaker multi-model endpoints addresses these pain points and gives businesses a scalable yet cost-effective solution to deploy multiple ML models.

What is the role of SageMaker in this?: Amazon SageMaker is a modular, end-to-end service that makes it easier to build, train, and deploy ML models at scale. After you train an ML model, you can deploy it on Amazon SageMaker endpoints that are fully managed and can serve inferences in real time with low latency. You can deploy multiple models on a common endpoint and serve them using a single serving container using multi-model endpoints. This makes it easy to manage ML deployments at scale and lowers your model deployment costs through increased usage of the endpoint and its underlying compute instances.

Part 1: Data Sourcing and pre-processing (>75% of the time spent on this step alone)

Infrastructure metrics (like CPU, memory, IO) are collected for 27 hosts belonging to a single application. The data for each host resides on S3 in the following layout: aws s3 ls s3://skuchkula/tastetest/raw/<day>/seal/<sealid>/<hostname>/<namespace>.csv

For example, if we take 2021-05-16 data for seal(application): 12345, we see that there are several hosts. For each of these hosts, we extracted the platform specific (i.e vmware, linux, windows etc) data to a csv file metric.compute.vmware.vm.cpu.csv, metric.compute.linux.cpu.csv etc.

aws s3 ls s3://skuchkula/tastetest/raw/2021-05-16/seal/12345/
                           PRE VSIA2P1953/
                           PRE VSIA2P1955/
                           PRE VSIA2P3340/
                           PRE VSIA2P3390/
                           PRE VSIE4P1317/
                           PRE VSIE4P1505/
                           PRE VSIE4P3038/
                           PRE VSIE4P3039/
                           PRE VSIN20P3651/
                           PRE VSIN20P3739/
                           PRE VSIN20P4765/
                           PRE VSIN20P4771/
                           PRE ccpborainfprd03/
                           PRE ccpborainfprd04/
                           PRE mrpborainfprd03/
                           PRE mrpborainfprd04/
                           PRE psia0p473/
                           PRE psia0p474/
                           PRE psia0p477/
                           PRE psia0p478/
                           PRE psin10p026/
                           PRE psin10p027/
                           PRE psin9p182/
                           PRE psin9p183/
aws s3 ls s3://skuchkula/tastetest/raw/2021-05-16/seal/12345/VSIA2P1953/
2021-08-03 15:24:51    1739195 metric.compute.vmware.vm.cpu.csv
2021-08-03 15:24:53    1005015 metric.compute.vmware.vm.memory.csv

Each of these csv files contains host performance attributes. Our objective is to build a training dataset for a particular host by combining the attributes across namespaces.

First, we need to write a script that navigates to s3://skuchkula/tastetest/raw/ and picks all the file objects. Then, within that directory pick a hostname and do the following:

  • Read in the metric.compute.vmware.vm.cpu.csv file.
  • Transform it. If there is an exception, then handle the exceptions.
  • Read in the metric.compute.vmware.vm.memory.csv file.
  • Repeat the same step as above.
  • Join the two dataframes and store it under the same prefix with file named combined.csv.

Repeat the above for the entire time range 2021-05-16 to present.

Second, we need to write a script that navigates to s3://skuchkula/tastetest/raw/ and picks all the file objects which contain the combined.csv, then build out an train.csv for each host. We can store the train.csv in another s3 prefix s3://skuchkula/tastetest/processed/seal/12345/<hostname>/train.csv

Let’s begin:

NOTE: Remove omniai.utils.sagemaker. This is just wrapper we have around Sagemaker for security reasons.

import boto3
import omniai.utils.sagemaker
from io import StringIO
import pandas as pd
from datetime import datetime
from dateutil import tz
import matplotlib.pyplot as plt
import sagemaker.amazon.common as smac
import os

import numpy as np
import pandas as pd
import time
%matplotlib inline

import boto3
from io import StringIO
from datetime import datetime

bucket = 'skuchkula'

import sagemaker
print(sagemaker.__version__)
2.9.1
s3 = boto3.resource('s3')

bucket_obj = s3.Bucket(bucket)
file_objs = list(bucket_obj.objects.filter(Prefix='tastetest/raw/'))
# keep only raw namespace data
file_objs = [obj for obj in file_objs if 'metric' in obj.key]
print(len(file_objs))
4569
def pivot_transform(df, index='collection_time', cols='attribute', value='value'):

    # convert collection_hour to datetime
    df[index] = pd.to_datetime(df[index])
    df.set_index(index, drop=True, inplace=True)
    df = df.pivot(columns=cols, values=value)

    return df

def upload_to_s3(content, s3_bucket, s3_key):
    #print("Uploading file: " + s3_key + " to S3 bucket: " + s3_bucket)

    s3 = boto3.resource('s3')

    # stream data to s3
    csv_buffer = StringIO()
    content.to_csv(csv_buffer, index=True)
    s3.Object(s3_bucket, s3_key).put(Body=csv_buffer.getvalue())

def download_from_s3(s3_bucket, s3_key):
    #print("Downloading file " + s3_key + " from S3 bucket " + s3_bucket)

    s3 = boto3.resource('s3')
    obj = s3.Object(s3_bucket, s3_key)
    content = obj.get()['Body'].read()
    df = pd.read_csv(StringIO(content.decode('utf-8')), index_col='collection_time', parse_dates=True)
    #print(df.shape)
    return df

In the following cell, we take the namespace data and transform each of them.

%%time
untransformable = []
untransformable_hosts = []

# check if all files are transposable
for file_obj in file_objs:
    # skip empty dataframes
    if file_obj.size > 100:
        try:
            # get the dataframe for this host and parse dates
            df_host = pd.read_csv(StringIO(file_obj.get()['Body'].read().decode('utf-8')))
            # filter out some cols
            df_host = df_host[df_host['attribute'].isin(['busy', 'iowait', 'usageAverage', 'guestActivePct',
                                                         'hostUsagePct', 'processesWaitingRunTime',
                                                         'load5m', 'usedPct'])]
            # check if it transforms
            df_transform = pivot_transform(df_host)

            key = file_obj.key
            file_path = '/'.join(key.split('/')[:-1])
            file_name = key.split('/')[-1:][0].split('.')[-2]
            key_path = file_path + '/transformed_' + file_name + '.csv'
            print(key_path)
            print(df_transform.shape)
            upload_to_s3(df_transform, bucket, key_path)
        except:
            untransformable.append(file_obj.key)
            untransformable_hosts.append(file_obj.split('/')[-2])
            print(file_obj.key)
tastetest/raw/2021-05-14/seal/12345/ccpborainfprd03/transformed_cpu.csv
(288, 2)
tastetest/raw/2021-05-14/seal/12345/ccpborainfprd03/transformed_memory.csv
(288, 2)
tastetest/raw/2021-05-14/seal/12345/ccpborainfprd03/transformed_system.csv
(288, 1)
tastetest/raw/2021-05-14/seal/12345/ccpborainfprd04/transformed_cpu.csv
(288, 2)
tastetest/raw/2021-05-14/seal/12345/ccpborainfprd04/transformed_memory.csv
(288, 2)
tastetest/raw/2021-05-14/seal/12345/ccpborainfprd04/transformed_system.csv
(288, 1)
......
......
tastetest/raw/2021-08-03/seal/12345/psin10p027/transformed_cpu.csv
(288, 2)
tastetest/raw/2021-08-03/seal/12345/psin10p027/transformed_memory.csv
(288, 2)
tastetest/raw/2021-08-03/seal/12345/psin10p027/transformed_system.csv
(288, 1)
tastetest/raw/2021-08-03/seal/12345/psin9p182/transformed_cpu.csv
(288, 2)
tastetest/raw/2021-08-03/seal/12345/psin9p182/transformed_memory.csv
(288, 2)
tastetest/raw/2021-08-03/seal/12345/psin9p182/transformed_system.csv
(276, 1)
tastetest/raw/2021-08-03/seal/12345/psin9p183/transformed_cpu.csv
(288, 2)
tastetest/raw/2021-08-03/seal/12345/psin9p183/transformed_memory.csv
(288, 2)
tastetest/raw/2021-08-03/seal/12345/psin9p183/transformed_system.csv
(288, 1)
CPU times: user 3min 24s, sys: 22.4 s, total: 3min 46s
Wall time: 24min 19s

Check if there are any untransformable hosts.

len(untransformable)
0
untransformable_hosts
[]

At this point, we have data in this format

aws s3 ls s3://skuchkula/tastetest/raw/2021-06-26/seal/12345/VSIA2P1953/
2021-08-03 19:15:50    1732944 metric.compute.vmware.vm.cpu.csv
2021-08-03 19:15:52     986786 metric.compute.vmware.vm.memory.csv
2021-08-05 12:00:37      14015 transformed_cpu.csv
2021-08-05 12:00:38      14050 transformed_memory.csv

The next step is to combine transformed_cpu.csv and transformed_memory.csv into combined.csv

HOSTS = ['VSIA2P1953',
        'VSIA2P1955',
        'VSIA2P3340',
        'VSIA2P3390',
        'VSIE4P1317',
        'VSIE4P1505',
        'VSIE4P3038',
        'VSIE4P3039',
        'VSIN20P3651',
        'VSIN20P3739',
        'VSIN20P4765',
        'VSIN20P4771',
        'psin10p026',
        'psin10p027',
        'psin9p182',
        'psin9p183',
        'mrpborainfprd03',
        'mrpborainfprd04',
        'ccpborainfprd03',
        'ccpborainfprd04',
        'psia0p477',
        'psia0p478',
        'psia0p473',
        'psia0p474']
from datetime import timedelta
import functools

for host in HOSTS[1:]:
    print("HOST: {}".format(host))
    start_dt = datetime(2021, 5, 15)
    end_dt = datetime.now()
    today = start_dt
    #host = 'VSIA2P1953'
    for i in range((end_dt - start_dt).days):
        # construct the s3 path
        s3_path = f"tastetest/raw/{today.strftime('%Y-%m-%d')}/seal/12345/{host}"

        # get the file obj for cpu namespace
        file_objs = list(bucket_obj.objects.filter(Prefix=s3_path))

        # get all objects which have transformed in them
        trans_objs = [obj for obj in file_objs if 'transformed' in obj.key]

        # get the dataframes from those csv files
        df_list = [pd.read_csv(StringIO(trans_obj.get()['Body'].read().decode('utf-8'))) for trans_obj in trans_objs]

        # merge them all on collection_time
        if len(df_list) > 0:
            df_final = functools.reduce(lambda left, right: pd.merge(left, right, on='collection_time'), df_list)
            combined_key = s3_path + '/combined.csv'
            print(combined_key)
            print(df_final.shape)
            upload_to_s3(df_final, bucket, combined_key)

        today = today + timedelta(days=1)
HOST: VSIA2P1955
tastetest/raw/2021-05-15/seal/12345/VSIA2P1955/combined.csv
(284, 5)
....
....
tastetest/raw/2021-08-03/seal/12345/VSIA2P1955/combined.csv
(271, 5)
HOST: VSIA2P3340
tastetest/raw/2021-05-15/seal/12345/VSIA2P3340/combined.csv
(284, 5)
....
....
tastetest/raw/2021-08-03/seal/12345/VSIA2P3340/combined.csv
(267, 5)
HOST: VSIA2P3390
tastetest/raw/2021-05-15/seal/12345/VSIA2P3390/combined.csv
(284, 5)
....
....
tastetest/raw/2021-08-03/seal/12345/VSIA2P3390/combined.csv
(269, 5)
....
....
SO ON for rest of the hosts.

At this point we will have the combined.csv file located under each day/host.

aws s3 ls s3://skuchkula/tastetest/raw/2021-06-26/seal/12345/psia0p473/
2021-08-05 15:52:53      13686 combined.csv
2021-08-03 19:18:29     231017 metric.compute.linux.cpu.csv
2021-08-03 19:19:03     653301 metric.compute.linux.memory.csv
2021-08-03 19:20:11     166523 metric.compute.linux.system.csv
2021-08-05 12:00:48       8623 transformed_cpu.csv
2021-08-05 12:00:48       8371 transformed_memory.csv
2021-08-05 12:00:48       7165 transformed_system.csv

Now, we should create the train.csv file by combining all the combined.csv files for each timestamp and host combination and store the output to tastetest/processed/2021-08/seal/12345/psia0p473/train.csv

for host in HOSTS:
    print("HOST: {}".format(host))
    start_dt = datetime(2021, 5, 15)
    end_dt = datetime.now()
    today = start_dt

    # used for storing all the data frames for a particualar host
    daily_host_list = []

    # iterate thru all days for this host
    for i in range((end_dt - start_dt).days):
        # construct the s3 path
        s3_path = f"tastetest/raw/{today.strftime('%Y-%m-%d')}/seal/12345/{host}/combined.csv"

        # get the file obj for combined.csv file
        file_objs = list(bucket_obj.objects.filter(Prefix=s3_path))

        # some hosts don't have data on certain days
        if len(file_objs) > 0:
            # get the dataframes from those csv files
            df_combined = pd.read_csv(StringIO(file_objs[0].get()['Body'].read().decode('utf-8')))

            daily_host_list.append(df_combined)

        # move on to the next day
        today = today + timedelta(days=1)

    print("For {} concatenating {} dataframes".format(host, len(daily_host_list)))
    df_host_overall = pd.concat(daily_host_list)

    # pre-process data for modeling
    df_host_overall.drop('Unnamed: 0', inplace=True, axis=1)
    df_host_overall['collection_time'] = pd.to_datetime(df_host_overall['collection_time'])
    df_host_overall.set_index('collection_time', drop=True, inplace=True)
    df_host_overall.sort_index(inplace=True)

    # extract features from the index
    df_host_overall['hour'] = df_host_overall.index.hour
    df_host_overall['dayofweek'] = df_host_overall.index.dayofweek

    # handle missing values using interpolate() to fill using linear imputation.
    # NOTE: when dealing with ts data it is not advised to use mean/median/mode imputation
    # It is better to use interpolation as it will consider before and after values.
    df_host_overall.interpolate(method='linear', inplace=True)

    # build the destination_key
    destination_key = f'tastetest/processed/seal/12345/{host}/train.csv'

    print("Uploading to {}".format(destination_key))

    # upload to S3
    upload_to_s3(df_host_overall, bucket, destination_key)
HOST: VSIA2P1953
For VSIA2P1953 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIA2P1953/train.csv
HOST: VSIA2P1955
For VSIA2P1955 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIA2P1955/train.csv
HOST: VSIA2P3340
For VSIA2P3340 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIA2P3340/train.csv
HOST: VSIA2P3390
For VSIA2P3390 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIA2P3390/train.csv
HOST: VSIE4P1317
For VSIE4P1317 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIE4P1317/train.csv
HOST: VSIE4P1505
For VSIE4P1505 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIE4P1505/train.csv
HOST: VSIE4P3038
For VSIE4P3038 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIE4P3038/train.csv
HOST: VSIE4P3039
For VSIE4P3039 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIE4P3039/train.csv
HOST: VSIN20P3651
For VSIN20P3651 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIN20P3651/train.csv
HOST: VSIN20P3739
For VSIN20P3739 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/VSIN20P3739/train.csv
HOST: VSIN20P4765
For VSIN20P4765 concatenating 80 dataframes
Uploading to tastetest/processed/seal/12345/VSIN20P4765/train.csv
HOST: VSIN20P4771
For VSIN20P4771 concatenating 80 dataframes
Uploading to tastetest/processed/seal/12345/VSIN20P4771/train.csv
HOST: psin10p026
For psin10p026 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/psin10p026/train.csv
HOST: psin10p027
For psin10p027 concatenating 81 dataframes
Uploading to tastetest/processed/seal/12345/psin10p027/train.csv
HOST: psin9p182
For psin9p182 concatenating 80 dataframes
Uploading to tastetest/processed/seal/12345/psin9p182/train.csv
HOST: psin9p183
For psin9p183 concatenating 80 dataframes
Uploading to tastetest/processed/seal/12345/psin9p183/train.csv
HOST: mrpborainfprd03
For mrpborainfprd03 concatenating 42 dataframes
Uploading to tastetest/processed/seal/12345/mrpborainfprd03/train.csv
HOST: mrpborainfprd04
For mrpborainfprd04 concatenating 42 dataframes
Uploading to tastetest/processed/seal/12345/mrpborainfprd04/train.csv
HOST: ccpborainfprd03
For ccpborainfprd03 concatenating 43 dataframes
Uploading to tastetest/processed/seal/12345/ccpborainfprd03/train.csv
HOST: ccpborainfprd04
For ccpborainfprd04 concatenating 43 dataframes
Uploading to tastetest/processed/seal/12345/ccpborainfprd04/train.csv
HOST: psia0p477
For psia0p477 concatenating 78 dataframes
Uploading to tastetest/processed/seal/12345/psia0p477/train.csv
HOST: psia0p478
For psia0p478 concatenating 80 dataframes
Uploading to tastetest/processed/seal/12345/psia0p478/train.csv
HOST: psia0p473
For psia0p473 concatenating 77 dataframes
Uploading to tastetest/processed/seal/12345/psia0p473/train.csv
HOST: psia0p474
For psia0p474 concatenating 77 dataframes
Uploading to tastetest/processed/seal/12345/psia0p474/train.csv

At this point, we have created the train.csv for each host. This will be used in the training process. Here’s the structure:

$ aws s3 ls s3://skuchkula/tastetest/processed/seal/12345/
                           PRE VSIA2P1953/
                           PRE VSIA2P1955/
                           PRE VSIA2P3340/
                           PRE VSIA2P3390/
                           PRE VSIE4P1317/
                           PRE VSIE4P1505/
                           PRE VSIE4P3038/
                           PRE VSIE4P3039/
                           PRE VSIN20P3651/
                           PRE VSIN20P3739/
                           PRE VSIN20P4765/
                           PRE VSIN20P4771/
                           PRE ccpborainfprd03/
                           PRE ccpborainfprd04/
                           PRE mrpborainfprd03/
                           PRE mrpborainfprd04/
                           PRE psia0p473/
                           PRE psia0p474/
                           PRE psia0p477/
                           PRE psia0p478/
                           PRE psin10p026/
                           PRE psin10p027/
                           PRE psin9p182/
                           PRE psin9p183/

$ aws s3 ls s3://skuchkula/tastetest/processed/seal/12345/VSIA2P1953/
2021-08-05 21:43:54    1823319 train.csv

Dealing with missing values

# number of values that are missing
df_host_overall.isna().sum()
busy                       1241
iowait                     1230
processesWaitingRunTime    1493
usedPct                    1483
load5m                     1778
hour                          0
dayofweek                     0
dtype: int64
# what percent of values are missing?
df_host_overall.isna().mean() * 100
busy                       6.365082
iowait                     6.308663
processesWaitingRunTime    7.657588
usedPct                    7.606298
load5m                     9.119352
hour                       0.000000
dayofweek                  0.000000
dtype: float64
df_host_overall['processesWaitingRunTime'].plot(figsize=(20, 10))

png

df_host_overall['iowait'].plot(figsize=(20, 10))

png

df_host_copy = df_host_overall.copy(deep=True)
df_host_copy
busy iowait processesWaitingRunTime usedPct load5m hour dayofweek
collection_time
2021-05-15 00:00:10 1.29 0.03 1.0 99.0 1.26 0 5
2021-05-15 00:05:10 1.11 0.02 2.0 99.0 1.23 0 5
2021-05-15 00:10:10 1.18 0.02 1.0 99.0 1.22 0 5
2021-05-15 00:15:10 1.21 0.02 2.0 99.0 1.22 0 5
2021-05-15 00:20:10 1.12 0.02 2.0 99.0 1.22 0 5
... ... ... ... ... ... ... ...
2021-07-30 19:08:56 1.71 0.02 NaN NaN NaN 19 4
2021-07-30 19:13:56 1.66 0.02 NaN NaN NaN 19 4
2021-07-30 19:18:56 1.15 0.03 NaN NaN NaN 19 4
2021-07-30 19:23:56 1.32 0.02 NaN NaN NaN 19 4
2021-07-30 19:28:56 1.35 0.02 NaN NaN NaN 19 4

19497 rows × 7 columns

df_host_copy.interpolate(method='linear', inplace=True)
df_host_copy['iowait'].plot(figsize=(20, 10))

png

df_host_copy['processesWaitingRunTime'].plot(figsize=(20, 10))

png

Part 2: Training Multiple RandomCutForest models at once.

Intro to Multimodel Endpoint Training and Deployment

With Amazon SageMaker multi-model endpoints, we can create an endpoint that seamlessly hosts up to thousands of models. These endpoints are well suited to use cases where any one of a large number of models, which can be served from a common inference container to save inference costs, needs to be invokable on-demand and where it is acceptable for infrequently invoked models to incur some additional latency. For applications which require consistently low inference latency, an endpoint deploying a single model is still the best choice.

At a high level, Amazon SageMaker manages the loading and unloading of models for a multi-model endpoint, as they are needed. When an invocation request is made for a particular model, Amazon SageMaker routes the request to an instance assigned to that model, downloads the model artifacts from S3 onto that instance, and initiates loading of the model into the memory of the container. As soon as the loading is complete, Amazon SageMaker performs the requested invocation and returns the result. If the model is already loaded in memory on the selected instance, the downloading and loading steps are skipped and the invocation is performed immediately.

Validating the data we have is in proper shape

def get_dataframe(file_obj):

    # get the dataframe for this host and parse dates
    df_host = pd.read_csv(StringIO(file_obj.get()['Body'].read().decode('utf-8')),
                          parse_dates=True, index_col='collection_time')

    return df_host
bucket_obj = s3.Bucket(bucket)
test_objs = list(bucket_obj.objects.filter(Prefix='tastetest/processed/seal/12345/'))
test_obj = test_objs[0]
for item in test_objs:
    df = get_dataframe(item)
    print(item.key)
    print(df.shape)
tastetest/processed/seal/12345/VSIA2P1953/train.csv
(20167, 6)
tastetest/processed/seal/12345/VSIA2P1955/train.csv
(20157, 6)
tastetest/processed/seal/12345/VSIA2P3340/train.csv
(19726, 6)
tastetest/processed/seal/12345/VSIA2P3390/train.csv
(20116, 6)
tastetest/processed/seal/12345/VSIE4P1317/train.csv
(22681, 6)
tastetest/processed/seal/12345/VSIE4P1505/train.csv
(22652, 6)
tastetest/processed/seal/12345/VSIE4P3038/train.csv
(20766, 6)
tastetest/processed/seal/12345/VSIE4P3039/train.csv
(21011, 6)
tastetest/processed/seal/12345/VSIN20P3651/train.csv
(22677, 6)
tastetest/processed/seal/12345/VSIN20P3739/train.csv
(22061, 6)
tastetest/processed/seal/12345/VSIN20P4765/train.csv
(21565, 6)
tastetest/processed/seal/12345/VSIN20P4771/train.csv
(21754, 6)
tastetest/processed/seal/12345/ccpborainfprd03/train.csv
(1182, 7)
tastetest/processed/seal/12345/ccpborainfprd04/train.csv
(1144, 7)
tastetest/processed/seal/12345/mrpborainfprd03/train.csv
(1158, 7)
tastetest/processed/seal/12345/mrpborainfprd04/train.csv
(332, 7)
tastetest/processed/seal/12345/psia0p473/train.csv
(19607, 7)
tastetest/processed/seal/12345/psia0p474/train.csv
(19497, 7)
tastetest/processed/seal/12345/psia0p477/train.csv
(16197, 7)
tastetest/processed/seal/12345/psia0p478/train.csv
(13656, 7)
tastetest/processed/seal/12345/psin10p026/train.csv
(17493, 7)
tastetest/processed/seal/12345/psin10p027/train.csv
(22789, 7)
tastetest/processed/seal/12345/psin9p182/train.csv
(22498, 7)
tastetest/processed/seal/12345/psin9p183/train.csv
(22490, 7)

As we observe here, that not all hosts have the same number of attributes. We need to add a dummy column to hosts that have only 6 features. This way, we can use 1 training job for all the hosts.

df_host = get_dataframe(test_objs[-1])
print(df_host.shape)
df_host
(22490, 7)
busy iowait processesWaitingRunTime usedPct load5m hour dayofweek
collection_time
2021-05-15 00:00:15 8.05 0.33 6.0 76.0 2.54 0 5
2021-05-15 00:05:15 11.61 1.03 4.0 76.0 3.17 0 5
2021-05-15 00:10:15 7.58 0.30 4.0 76.0 2.67 0 5
2021-05-15 00:15:15 7.74 0.42 3.0 76.0 2.59 0 5
2021-05-15 00:20:15 8.95 0.26 5.0 76.0 2.42 0 5
... ... ... ... ... ... ... ...
2021-08-03 23:35:28 3.02 0.36 1.0 77.0 1.56 23 1
2021-08-03 23:40:28 3.05 0.45 1.0 77.0 1.46 23 1
2021-08-03 23:45:28 3.26 0.57 1.0 77.0 1.51 23 1
2021-08-03 23:50:28 3.06 0.33 2.0 77.0 1.56 23 1
2021-08-03 23:55:28 2.93 0.31 1.0 77.0 1.52 23 1

22490 rows × 7 columns

df_host = get_dataframe(test_objs[1])
print(df_host.shape)
df_host
(20157, 6)
iowait usageAverage guestActivePct hostUsagePct hour dayofweek
collection_time
2021-05-15 00:01:39 3.800000 7.740667 6.133327 6.123333 0 5
2021-05-15 00:06:39 2.933333 7.424000 7.199993 7.190000 0 5
2021-05-15 00:11:39 5.400000 7.366000 5.933328 5.923333 0 5
2021-05-15 00:16:39 4.133333 7.636667 8.799992 8.790000 0 5
2021-05-15 00:21:39 3.600000 7.428000 8.199992 8.190000 0 5
... ... ... ... ... ... ...
2021-08-03 23:30:55 0.200000 9.703333 6.733327 6.723333 23 1
2021-08-03 23:40:59 0.133333 9.685333 7.866659 7.856667 23 1
2021-08-03 23:46:00 0.466667 9.906667 8.066659 8.056666 23 1
2021-08-03 23:51:00 0.000000 9.514667 7.933326 7.923333 23 1
2021-08-03 23:56:01 0.000000 9.667334 8.533325 8.523334 23 1

20157 rows × 6 columns

if df_host.shape[1] == 6:
    df_host['dummy'] = 0
df_host
iowait usageAverage guestActivePct hostUsagePct hour dayofweek dummy
collection_time
2021-05-15 00:01:39 3.800000 7.740667 6.133327 6.123333 0 5 0
2021-05-15 00:06:39 2.933333 7.424000 7.199993 7.190000 0 5 0
2021-05-15 00:11:39 5.400000 7.366000 5.933328 5.923333 0 5 0
2021-05-15 00:16:39 4.133333 7.636667 8.799992 8.790000 0 5 0
2021-05-15 00:21:39 3.600000 7.428000 8.199992 8.190000 0 5 0
... ... ... ... ... ... ... ...
2021-08-03 23:30:55 0.200000 9.703333 6.733327 6.723333 23 1 0
2021-08-03 23:40:59 0.133333 9.685333 7.866659 7.856667 23 1 0
2021-08-03 23:46:00 0.466667 9.906667 8.066659 8.056666 23 1 0
2021-08-03 23:51:00 0.000000 9.514667 7.933326 7.923333 23 1 0
2021-08-03 23:56:01 0.000000 9.667334 8.533325 8.523334 23 1 0

20157 rows × 7 columns

Setup your training environment

# import sagemaker
# from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import image_uris
import boto3
from sklearn.model_selection import train_test_split

sagemaker_session = sagemaker.Session()

# Set Sagemaker's default bucket to correct S3 bucket
sagemaker_session._default_bucket = omniai.utils.sagemaker.bucket

BUCKET = sagemaker_session._default_bucket

# This is references the AWS managed XGBoost container
RCF_IMAGE = sagemaker.image_uris.retrieve('randomcutforest', boto3.Session().region_name)

DATA_PREFIX = "tastetest/RCF-MULTIMODEL/2021-08-06"
MULTI_MODEL_ARTIFACTS = "multi_model_artifacts"

TRAIN_INSTANCE_TYPE = "ml.m4.xlarge"
ENDPOINT_INSTANCE_TYPE = "ml.m4.xlarge"

ENDPOINT_NAME = "mme-rcf-vsi-12345"

MODEL_NAME = ENDPOINT_NAME
INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.

Split, Convert and Upload a CSV file to RecordIO format.

RCF takes data in RecordIO format for training. Thus, we need to convert each of our train.csv files into RecordIO .pbr formats.

import sagemaker.amazon.common as smac
import os

def split_convert_upload(sparray, bucket, prefix, fname_template='data_part{}.pbr', n_parts=2):
    import io
    import boto3
    import sagemaker.amazon.common as smac

    chunk_size = sparray.shape[0]// n_parts
    for i in range(n_parts):

        # Calculate start and end indices
        start = i*chunk_size
        end = (i+1)*chunk_size
        if i+1 == n_parts:
            end = sparray.shape[0]

        # Convert to record protobuf
        buf = io.BytesIO()
        #smac.write_spmatrix_to_sparse_tensor(array=sparray[start:end], file=buf, labels=None)
        smac.write_numpy_to_dense_tensor(array=sparray[start:end], file=buf, labels=None)
        buf.seek(0)

        # Upload to s3 location specified by bucket and prefix
        fname = os.path.join(prefix, fname_template.format(i))
        boto3.resource('s3').Bucket(bucket).Object(fname).upload_fileobj(buf)
        print('Uploaded data to s3://{}'.format(os.path.join(bucket, fname)))

Define the location where you want the data to be uploaded.

prefix = 'tastetest/RCF-MULTIMODEL/2021-08-06/model-training/'

train_prefix = os.path.join(prefix, 'train')
output_prefix = os.path.join(prefix, 'output')

s3_train_data = os.path.join('s3://', bucket, train_prefix)
output_path = os.path.join('s3://', bucket, output_prefix)
print('Training set location', s3_train_data)
print('Trained model will be saved at', output_path)
Training set location s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/model-training/train
Trained model will be saved at s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/model-training/output

Next, convert the data to an array and upload it to S3

def upload_data_for_host(host):

    print("HOST: {}".format(host))

    # Where to get the data from?
    host_prefix = 'tastetest/processed/seal/12345/' + host + '/train.csv'

    # get the S3 object for this host
    host_file = bucket_obj.objects.filter(Prefix=host_prefix)
    host_obj = list(host_file)[0]

    # get the dataframe for this host and parse dates
    df_host = pd.read_csv(StringIO(host_obj.get()['Body'].read().decode('utf-8')),
                          parse_dates=True, index_col='collection_time')

    if df_host.shape[1] == 6:
        df_host['dummy'] = 0

    print("Shape: {}".format(df_host.shape))

    # build the train_prefix
    prefix = 'tastetest/RCF-MULTIMODEL/2021-08-06/' + host + '/model-training/'

    train_prefix = os.path.join(prefix, 'train')
    output_prefix = os.path.join(prefix, 'output')

    s3_train_data = os.path.join('s3://', bucket, train_prefix)
    output_path = os.path.join('s3://', bucket, output_prefix)
    print('Training set location', s3_train_data)
    print('Trained model will be saved at', output_path)

    # convert this dataframe into a np array
    df_host.reset_index(inplace=True, drop=True)
    df_array = df_host.to_numpy()

    # upload this array to the train_prefix
    split_convert_upload(df_array, bucket=BUCKET, prefix=train_prefix, fname_template='train_part{}.pbr', n_parts=4)

for host in HOSTS:
    print("Uploading data for {}".format(host))
    upload_data_for_host(host)
Uploading data for VSIA2P1953
HOST: VSIA2P1953
Shape: (20167, 7)
Training set location s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/train
Trained model will be saved at s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/output
Uploaded data to s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/train/train_part0.pbr
Uploaded data to s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/train/train_part1.pbr
Uploaded data to s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/train/train_part2.pbr
Uploaded data to s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/train/train_part3.pbr
Uploading data for VSIA2P1955
HOST: VSIA2P1955
Shape: (20157, 7)
Training set location s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1955/model-training/train
Trained model will be saved at s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1955/model-training/output
Uploaded data to s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1955/model-training/train/train_part0.pbr
Uploaded data to s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1955/model-training/train/train_part1.pbr
Uploaded data to s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1955/model-training/train/train_part2.pbr
Uploaded data to s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1955/model-training/train/train_part3.pbr
....
....

Launch a single training job for a given host

In the follow section, we are setting up the code to train a RCF model for each of 27 hosts.

As such, we will launch multiple training jobs asynchronously, using the RandomCutForest algorithm.

In this notebook, we will be using the AWS Managed RCF Image for both training and inference - this image provides native support for launching multi-model endpoints.

There is nothing specific to multi-model endpoints in terms of the models it will host. They are trained in the same way as all other SageMaker models. Here we are using the RCF estimator and not waiting for the job to complete.

# SageMaker setup
# All training algorithm parameters must be wrapped with this module
from omniai.utils.sagemaker import get_pysdk_training_params
from sagemaker.inputs import TrainingInput

def launch_training_job(host):

    _job = "rcf-{}".format(host)

    # build the output_prefix
    prefix = 'tastetest/RCF-MULTIMODEL/2021-08-06/' + host + '/model-training/'

    train_prefix = os.path.join(prefix, 'train')
    output_prefix = os.path.join(prefix, 'output')

    s3_train_data = os.path.join('s3://', bucket, train_prefix)
    s3_output_path = os.path.join('s3://', bucket, output_prefix)


    rcf_params = {"instance_type":"ml.c4.xlarge",
                  "instance_count": 1,
                  "volume_size": 30,
                  "output_path":s3_output_path,
                  "base_job_name":_job,
                  "hyperparameters":{'num_samples_per_tree': 512, 'num_trees': 100, 'feature_dim': 7}
                 }

    rcf = sagemaker.estimator.Estimator(RCF_IMAGE, **get_pysdk_training_params(rcf_params),
                                      sagemaker_session=sagemaker_session)

    # set the input channel
    s3_input_train = TrainingInput(s3_train_data, distribution='ShardedByS3Key')


    rcf.fit({'train': s3_input_train}, wait=False)

    # Return the estimator object
    return rcf

Kick off a model training job for each host

estimators = []

for host in HOSTS:
    estimator = launch_training_job(host)
    estimators.append(estimator)

print()
print(
    f"{len(estimators)} training jobs launched: {[x.latest_training_job.job_name for x in estimators]}"
)
24 training jobs launched: ['rcf-VSIA2P1953-2021-08-06-15-58-43-629', 'rcf-VSIA2P1955-2021-08-06-15-58-44-202', 'rcf-VSIA2P3340-2021-08-06-15-58-46-562', 'rcf-VSIA2P3390-2021-08-06-15-58-52-089', 'rcf-VSIE4P1317-2021-08-06-15-58-53-207', 'rcf-VSIE4P1505-2021-08-06-15-58-54-252', 'rcf-VSIE4P3038-2021-08-06-15-58-54-987', 'rcf-VSIE4P3039-2021-08-06-15-58-56-977', 'rcf-VSIN20P3651-2021-08-06-15-58-58-359', 'rcf-VSIN20P3739-2021-08-06-15-59-00-784', 'rcf-VSIN20P4765-2021-08-06-15-59-03-904', 'rcf-VSIN20P4771-2021-08-06-15-59-04-456', 'rcf-psin10p026-2021-08-06-15-59-05-450', 'rcf-psin10p027-2021-08-06-15-59-10-703', 'rcf-psin9p182-2021-08-06-15-59-11-669', 'rcf-psin9p183-2021-08-06-15-59-12-820', 'rcf-mrpborainfprd03-2021-08-06-15-59-14-153', 'rcf-mrpborainfprd04-2021-08-06-15-59-16-988', 'rcf-ccpborainfprd03-2021-08-06-15-59-17-636', 'rcf-ccpborainfprd04-2021-08-06-15-59-20-609', 'rcf-psia0p477-2021-08-06-15-59-22-278', 'rcf-psia0p478-2021-08-06-15-59-25-335', 'rcf-psia0p473-2021-08-06-15-59-26-168', 'rcf-psia0p474-2021-08-06-15-59-27-451']

Wait for all model training to finish

def wait_for_training_job_to_complete(estimator):
    job = estimator.latest_training_job.job_name
    print(f"Waiting for job: {job}")
    status = estimator.latest_training_job.describe()["TrainingJobStatus"]
    while status == "InProgress":
        time.sleep(45)
        status = estimator.latest_training_job.describe()["TrainingJobStatus"]
        if status == "InProgress":
            print(f"{job} job status: {status}")
    print(f"DONE. Status for {job} is {status}\n")
for est in estimators:
    wait_for_training_job_to_complete(est)
Waiting for job: rcf-VSIA2P1953-2021-08-06-15-58-43-629
rcf-VSIA2P1953-2021-08-06-15-58-43-629 job status: InProgress
rcf-VSIA2P1953-2021-08-06-15-58-43-629 job status: InProgress
DONE. Status for rcf-VSIA2P1953-2021-08-06-15-58-43-629 is Completed

Waiting for job: rcf-VSIA2P1955-2021-08-06-15-58-44-202
DONE. Status for rcf-VSIA2P1955-2021-08-06-15-58-44-202 is Completed

Waiting for job: rcf-VSIA2P3340-2021-08-06-15-58-46-562
DONE. Status for rcf-VSIA2P3340-2021-08-06-15-58-46-562 is Completed

Waiting for job: rcf-VSIA2P3390-2021-08-06-15-58-52-089
DONE. Status for rcf-VSIA2P3390-2021-08-06-15-58-52-089 is Completed

Waiting for job: rcf-VSIE4P1317-2021-08-06-15-58-53-207
DONE. Status for rcf-VSIE4P1317-2021-08-06-15-58-53-207 is Completed

Waiting for job: rcf-VSIE4P1505-2021-08-06-15-58-54-252
DONE. Status for rcf-VSIE4P1505-2021-08-06-15-58-54-252 is Completed

Waiting for job: rcf-VSIE4P3038-2021-08-06-15-58-54-987
DONE. Status for rcf-VSIE4P3038-2021-08-06-15-58-54-987 is Completed

Waiting for job: rcf-VSIE4P3039-2021-08-06-15-58-56-977
DONE. Status for rcf-VSIE4P3039-2021-08-06-15-58-56-977 is Completed

Waiting for job: rcf-VSIN20P3651-2021-08-06-15-58-58-359
DONE. Status for rcf-VSIN20P3651-2021-08-06-15-58-58-359 is Completed

Waiting for job: rcf-VSIN20P3739-2021-08-06-15-59-00-784
DONE. Status for rcf-VSIN20P3739-2021-08-06-15-59-00-784 is Completed

Waiting for job: rcf-VSIN20P4765-2021-08-06-15-59-03-904
DONE. Status for rcf-VSIN20P4765-2021-08-06-15-59-03-904 is Completed

Waiting for job: rcf-VSIN20P4771-2021-08-06-15-59-04-456
DONE. Status for rcf-VSIN20P4771-2021-08-06-15-59-04-456 is Completed

Waiting for job: rcf-psin10p026-2021-08-06-15-59-05-450
DONE. Status for rcf-psin10p026-2021-08-06-15-59-05-450 is Completed

Waiting for job: rcf-psin10p027-2021-08-06-15-59-10-703
DONE. Status for rcf-psin10p027-2021-08-06-15-59-10-703 is Completed

Waiting for job: rcf-psin9p182-2021-08-06-15-59-11-669
DONE. Status for rcf-psin9p182-2021-08-06-15-59-11-669 is Completed

Waiting for job: rcf-psin9p183-2021-08-06-15-59-12-820
DONE. Status for rcf-psin9p183-2021-08-06-15-59-12-820 is Completed

Waiting for job: rcf-mrpborainfprd03-2021-08-06-15-59-14-153
DONE. Status for rcf-mrpborainfprd03-2021-08-06-15-59-14-153 is Completed

Waiting for job: rcf-mrpborainfprd04-2021-08-06-15-59-16-988
rcf-mrpborainfprd04-2021-08-06-15-59-16-988 job status: InProgress
rcf-mrpborainfprd04-2021-08-06-15-59-16-988 job status: InProgress
rcf-mrpborainfprd04-2021-08-06-15-59-16-988 job status: InProgress
rcf-mrpborainfprd04-2021-08-06-15-59-16-988 job status: InProgress
rcf-mrpborainfprd04-2021-08-06-15-59-16-988 job status: InProgress
rcf-mrpborainfprd04-2021-08-06-15-59-16-988 job status: InProgress
rcf-mrpborainfprd04-2021-08-06-15-59-16-988 job status: InProgress
DONE. Status for rcf-mrpborainfprd04-2021-08-06-15-59-16-988 is Completed

Waiting for job: rcf-ccpborainfprd03-2021-08-06-15-59-17-636
DONE. Status for rcf-ccpborainfprd03-2021-08-06-15-59-17-636 is Failed

Waiting for job: rcf-ccpborainfprd04-2021-08-06-15-59-20-609
DONE. Status for rcf-ccpborainfprd04-2021-08-06-15-59-20-609 is Failed

Waiting for job: rcf-psia0p477-2021-08-06-15-59-22-278
DONE. Status for rcf-psia0p477-2021-08-06-15-59-22-278 is Completed

Waiting for job: rcf-psia0p478-2021-08-06-15-59-25-335
DONE. Status for rcf-psia0p478-2021-08-06-15-59-25-335 is Completed

Waiting for job: rcf-psia0p473-2021-08-06-15-59-26-168
DONE. Status for rcf-psia0p473-2021-08-06-15-59-26-168 is Completed

Waiting for job: rcf-psia0p474-2021-08-06-15-59-27-451
DONE. Status for rcf-psia0p474-2021-08-06-15-59-27-451 is Completed

Part 3: Create the multi-model endpoint with the SageMaker SDK

Create the Amazon SageMaker MultiDataModel entity

We create the multi-model endpoint using the MultiDataModel class.

You can create a MultiDataModel by directly passing in a sagemaker.model.Model object - in which case, the Endpoint will inherit information about the image to use, as well as any environmental variables, network isolation, etc., once the MultiDataModel is deployed.

In addition, a MultiDataModel can also be created without explicitly passing a sagemaker.model.Model object. Please refer to the documentation for additional details.

from sagemaker.multidatamodel import MultiDataModel

estimator = estimators[0]
model = estimator.create_model(image_uri=RCF_IMAGE)

# This is where our MME will read models from on S3.
model_data_prefix = f"s3://{BUCKET}/{DATA_PREFIX}/{MULTI_MODEL_ARTIFACTS}/"

mme = MultiDataModel(
    name=MODEL_NAME,
    model_data_prefix=model_data_prefix,
    model=model,  # passing our model - passes container image needed for the endpoint
    sagemaker_session=sagemaker_session,
)

Part 4: Deploy the Multi Model Endpoint

You need to consider the appropriate instance type and number of instances for the projected prediction workload across all the models you plan to host behind your multi-model endpoint. The number and size of the individual models will also drive memory requirements.

predictor = mme.deploy(
    initial_instance_count=1,
    kms_key=omniai.utils.sagemaker.kms_key,
    instance_type=ENDPOINT_INSTANCE_TYPE,
    endpoint_name=ENDPOINT_NAME
)
INFO:sagemaker:Creating model with name: mme-rcf-vsi-12345
INFO:sagemaker:Creating endpoint with name mme-rcf-vsi-12345
---------------------!

Our endpoint has launched! Let’s look at what models are available to the endpoint!

By ‘available’, what we mean is, what model artfiacts are currently stored under the S3 prefix we defined when setting up the MultiDataModel above i.e. model_data_prefix.

Currently, since we have no artifacts (i.e. tar.gz files) stored under our defined S3 prefix, our endpoint, will have no models ‘available’ to serve inference requests.

We will demonstrate how to make models ‘available’ to our endpoint below.

# No models visible!
list(mme.list_models())
[]

Lets deploy model artifacts to be found by the endpoint

We are now using the .add_model() method of the MultiDataModel to copy over our model artifacts from where they were initially stored, during training, to where our endpoint will source model artifacts for inference requests.

model_data_source refers to the location of our model artifact (i.e. where it was deposited on S3 after training completed)

model_data_path is the relative path to the S3 prefix we specified above (i.e. model_data_prefix) where our endpoint will source models for inference requests.

Since this is a relative path, we can simply pass the name of what we wish to call the model artifact at inference time (i.e. VSIA2P1953.tar.gz)

Dynamically deploying additional models

It is also important to note, that we can always use the .add_model() method, as shown below, to dynamically deploy more models to the endpoint, to serve up inference requests as needed.

estimators[0].latest_training_job.describe()
{'TrainingJobName': 'rcf-VSIA2P1953-2021-08-06-15-58-43-629',
 'TrainingJobArn': 'arn:aws:sagemaker:us-east-1:888888888888:training-job/rcf-vsia2p1953-2021-08-06-15-58-43-629',
 'ModelArtifacts': {'S3ModelArtifacts': 's3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/output/rcf-VSIA2P1953-2021-08-06-15-58-43-629/output/model.tar.gz'},
 'TrainingJobStatus': 'Completed',
 'SecondaryStatus': 'Completed',
 'HyperParameters': {'feature_dim': '7',
  'num_samples_per_tree': '512',
  'num_trees': '100'},
 'AlgorithmSpecification': {'TrainingImage': '38241738.dkr.ecr.us-east-1.amazonaws.com/randomcutforest:1',
  'TrainingInputMode': 'File',
  'MetricDefinitions': [{'Name': 'train:progress',
    'Regex': '#progress_metric: host=\\S+, completed (\\S+) %'},
   {'Name': 'test:f1',
    'Regex': '#quality_metric: host=\\S+, test f1 <score>=(\\S+)'},
   {'Name': 'train:throughput',
    'Regex': '#throughput_metric: host=\\S+, train throughput=(\\S+) records/second'}],
  'EnableSageMakerMetricsTimeSeries': False},
 'RoleArn': 'arn:aws:iam::888888888888:role/sagemaker/omniai-system-sm-role',
 'InputDataConfig': [{'ChannelName': 'train',
   'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix',
     'S3Uri': 's3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/train',
     'S3DataDistributionType': 'ShardedByS3Key'}},
   'CompressionType': 'None',
   'RecordWrapperType': 'None'}],
 'OutputDataConfig': {'KmsKeyId': 'arn:aws:kms:us-east-1:888888888888:key/f43c82fd-a0ff-4db7-bb5e-cd8ee4e8bdde',
  'S3OutputPath': 's3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/output'},
 'ResourceConfig': {'InstanceType': 'ml.c4.xlarge',
  'InstanceCount': 1,
  'VolumeSizeInGB': 30,
  'VolumeKmsKeyId': 'arn:aws:kms:us-east-1:888888888888:key/f43c82fd-a0ff-4db7-bb5e-cd8ee4e8bdde'},
 'VpcConfig': {'SecurityGroupIds': ['sg-08fa1a35165b38c8f'],
  'Subnets': ['subnet-03de0646d7a46a997', 'subnet-0e2973e7785ef53e1']},
 'StoppingCondition': {'MaxRuntimeInSeconds': 86400},
 'CreationTime': datetime.datetime(2021, 8, 6, 15, 58, 43, 901000, tzinfo=tzlocal()),
 'TrainingStartTime': datetime.datetime(2021, 8, 6, 16, 1, 8, 419000, tzinfo=tzlocal()),
 'TrainingEndTime': datetime.datetime(2021, 8, 6, 16, 2, 13, 763000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 8, 6, 16, 2, 13, 763000, tzinfo=tzlocal()),
 'SecondaryStatusTransitions': [{'Status': 'Starting',
   'StartTime': datetime.datetime(2021, 8, 6, 15, 58, 43, 901000, tzinfo=tzlocal()),
   'EndTime': datetime.datetime(2021, 8, 6, 16, 1, 8, 419000, tzinfo=tzlocal()),
   'StatusMessage': 'Preparing the instances for training'},
  {'Status': 'Downloading',
   'StartTime': datetime.datetime(2021, 8, 6, 16, 1, 8, 419000, tzinfo=tzlocal()),
   'EndTime': datetime.datetime(2021, 8, 6, 16, 1, 32, 151000, tzinfo=tzlocal()),
   'StatusMessage': 'Downloading input data'},
  {'Status': 'Training',
   'StartTime': datetime.datetime(2021, 8, 6, 16, 1, 32, 151000, tzinfo=tzlocal()),
   'EndTime': datetime.datetime(2021, 8, 6, 16, 2, 7, 39000, tzinfo=tzlocal()),
   'StatusMessage': 'Training image download completed. Training in progress.'},
  {'Status': 'Uploading',
   'StartTime': datetime.datetime(2021, 8, 6, 16, 2, 7, 39000, tzinfo=tzlocal()),
   'EndTime': datetime.datetime(2021, 8, 6, 16, 2, 13, 763000, tzinfo=tzlocal()),
   'StatusMessage': 'Uploading generated training model'},
  {'Status': 'Completed',
   'StartTime': datetime.datetime(2021, 8, 6, 16, 2, 13, 763000, tzinfo=tzlocal()),
   'EndTime': datetime.datetime(2021, 8, 6, 16, 2, 13, 763000, tzinfo=tzlocal()),
   'StatusMessage': 'Training job completed'}],
 'FinalMetricDataList': [{'MetricName': 'train:progress',
   'Value': 100.0,
   'Timestamp': datetime.datetime(2021, 8, 6, 16, 2, 3, tzinfo=tzlocal())},
  {'MetricName': 'train:throughput',
   'Value': 344432.9375,
   'Timestamp': datetime.datetime(2021, 8, 6, 16, 2, 3, tzinfo=tzlocal())}],
 'EnableNetworkIsolation': False,
 'EnableInterContainerTrafficEncryption': True,
 'EnableManagedSpotTraining': False,
 'TrainingTimeInSeconds': 65,
 'BillableTimeInSeconds': 65,
 'ProfilingStatus': 'Disabled',
 'ResponseMetadata': {'RequestId': 'd644afa3-d19d-414d-8b26-56108c9f4319',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd644afa3-d19d-414d-8b26-56108c9f4319',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '3510',
   'date': 'Fri, 06 Aug 2021 16:28:32 GMT'},
  'RetryAttempts': 0}}

We are interested in this 'ModelArtifacts': {'S3ModelArtifacts': 's3://skuchkula/tastetest/RCF-MULTIMODEL/CCPBVWEBPRD263/model-training/output/rcf-CCPBVWEBPRD263-2021-07-29-16-48-40-284/output/model.tar.gz'}

artifact_path = estimators[0].latest_training_job.describe()["ModelArtifacts"]["S3ModelArtifacts"]
print(artifact_path)
s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/VSIA2P1953/model-training/output/rcf-VSIA2P1953-2021-08-06-15-58-43-629/output/model.tar.gz

Remember, that we said that the endpoint can find all the model artifacts in this 'multi_model_artifacts' location on S3. But, we also need to store these models with their host names.

artifact_path.split("/")[-6]
'CCPBVWEBPRD263'
len(estimators)
24
for est in estimators:
    try:
        artifact_path = est.latest_training_job.describe()["ModelArtifacts"]["S3ModelArtifacts"]
        #print(artifact_path)
        model_name = artifact_path.split("/")[-6] + ".tar.gz"
        #print(model_name)
        # This is copying over the model artifact to the S3 location for the MME.
        mme.add_model(model_data_source=artifact_path, model_data_path=model_name)
    except:
        print("-----FAILED-------")
        print("TRAINING JOB FAILED: {}".format(est.latest_training_job.describe()['TrainingJobName']))
        print("------------------")

-----FAILED-------
TRAINING JOB FAILED: rcf-ccpborainfprd03-2021-08-06-15-59-17-636
------------------
-----FAILED-------
TRAINING JOB FAILED: rcf-ccpborainfprd04-2021-08-06-15-59-20-609
------------------
$ aws s3 ls s3://skuchkula/tastetest/RCF-MULTIMODEL/2021-08-06/multi_model_artifacts/
2021-08-06 12:29:43    1040252 VSIA2P1953.tar.gz
2021-08-06 12:29:43    1024793 VSIA2P1955.tar.gz
2021-08-06 12:29:44    1009122 VSIA2P3340.tar.gz
2021-08-06 12:29:44    1016081 VSIA2P3390.tar.gz
2021-08-06 12:29:44    1157434 VSIE4P1317.tar.gz
2021-08-06 12:29:45    1175587 VSIE4P1505.tar.gz
2021-08-06 12:29:45    1051178 VSIE4P3038.tar.gz
2021-08-06 12:29:45    1075305 VSIE4P3039.tar.gz
2021-08-06 12:29:45    1258476 VSIN20P3651.tar.gz
2021-08-06 12:29:46    1197140 VSIN20P3739.tar.gz
2021-08-06 12:29:46    1171047 VSIN20P4765.tar.gz
2021-08-06 12:29:46    1180553 VSIN20P4771.tar.gz
2021-08-06 12:29:48      33976 mrpborainfprd03.tar.gz
2021-08-06 12:29:48       6908 mrpborainfprd04.tar.gz
2021-08-06 12:29:49     724975 psia0p473.tar.gz
2021-08-06 12:29:49     691827 psia0p474.tar.gz
2021-08-06 12:29:48     605009 psia0p477.tar.gz
2021-08-06 12:29:48     499447 psia0p478.tar.gz
2021-08-06 12:29:46     647094 psin10p026.tar.gz
2021-08-06 12:29:47     846069 psin10p027.tar.gz
2021-08-06 12:29:47     907423 psin9p182.tar.gz
2021-08-06 12:29:47     910723 psin9p183.tar.gz

We have added the 24 model artifacts from our training jobs!

We can see that the S3 prefix we specified when setting up MultiDataModel now has 24 model artifacts. As such, the endpoint can now serve up inference requests for these models.

list(mme.list_models())
['VSIA2P1953.tar.gz',
 'VSIA2P1955.tar.gz',
 'VSIA2P3340.tar.gz',
 'VSIA2P3390.tar.gz',
 'VSIE4P1317.tar.gz',
 'VSIE4P1505.tar.gz',
 'VSIE4P3038.tar.gz',
 'VSIE4P3039.tar.gz',
 'VSIN20P3651.tar.gz',
 'VSIN20P3739.tar.gz',
 'VSIN20P4765.tar.gz',
 'VSIN20P4771.tar.gz',
 'mrpborainfprd03.tar.gz',
 'mrpborainfprd04.tar.gz',
 'psia0p473.tar.gz',
 'psia0p474.tar.gz',
 'psia0p477.tar.gz',
 'psia0p478.tar.gz',
 'psin10p026.tar.gz',
 'psin10p027.tar.gz',
 'psin9p182.tar.gz',
 'psin9p183.tar.gz']

Part 5: Get predictions from the endpoint

Recall that mme.deploy() returns a RealTimePredictor that we saved in a variable called predictor.

We will use predictor to submit requests to the endpoint.

RCF supports text/csv for the content type and accept type. For more information on RCF Input/Output Interface, please see here.

Since the default RealTimePredictor does not have a serializer or deserializer set for requests, we will also set these.

This will allow us to submit a python list for inference, and get back a float response.

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

predictor.serializer = CSVSerializer()
predictor.deserializer = JSONDeserializer()

Invoking models on a multi-model endpoint from notebook

Notice the higher latencies on the first invocation of any given model. This is due to the time it takes SageMaker to download the model to the Endpoint instance and then load the model into the inference container. Subsequent invocations of the same model take advantage of the model already being loaded into the inference container.

# we will need this to get the threshold values.
def get_dataframe_for_host(host):

    # first get the dataframe from S3
    # TODO: don't hardcode this
    host_prefix = 'tastetest/processed/seal/12345/' + host + '/train.csv'

    # get the S3 object for this host
    host_file = bucket_obj.objects.filter(Prefix=host_prefix)
    host_obj = list(host_file)[0]

    # get the dataframe for this host and parse dates
    df_host = pd.read_csv(StringIO(host_obj.get()['Body'].read().decode('utf-8')),
                          parse_dates=True, index_col='collection_time')


    return df_host
def get_inference_for_host(host):

    # get dataframe for host
    df_host = get_dataframe_for_host(host)

    if df_host.shape[1] == 6:
        df_host['dummy'] = 0

    # convert this dataframe into a np array
    df_array = df_host.to_numpy()

    # target model
    model_name = host + ".tar.gz"

    # assuming we have access to "predictor" in this notebook
    results = predictor.predict(data=df_array,
                            initial_args={"ContentType": "text/csv", "Accept": "application/json"},
                            target_model=model_name)
    # extract the scores
    scores = [datum["score"] for datum in results["scores"]]

    # create df_inference
    df_inference = df_host.copy()

    # create a column
    df_inference["score"] = pd.Series(scores, index=df_inference.index)

    return df_inference
def get_thresholds_anomalies(df):
    score_mean = df["score"].mean()
    score_std = df["score"].std()
    score_cutoff = score_mean + 5 * score_std

    anomalies = df[df["score"] > score_cutoff]

    return score_cutoff, anomalies
from sklearn.preprocessing import MinMaxScaler

plt.style.use('fivethirtyeight')

def plot_anomalies(df_inference, anomalies):
    fig, ax1 = plt.subplots(figsize=(20,10))
    ax2 = ax1.twinx()

    df_xaxis = df_inference.drop(['score', 'dayofweek', 'hour'], axis=1)

    min_max_scaler = MinMaxScaler()

    min_max_scaled_df = pd.DataFrame(min_max_scaler.fit_transform(df_xaxis),
                                     columns=df_xaxis.columns, index=df_xaxis.index)

    ax1.plot(min_max_scaled_df)
    ax2.plot(anomalies.index, anomalies.score, "ko", color='black')

    ax1.grid(which="major", axis="both")

    ax1.set_ylabel("Infra metrics", color="C0", alpha=0.9)
    ax2.set_ylabel("Anomaly Score", color="C1")

    ax1.tick_params("y", colors="C0")
    ax2.tick_params("y", colors="C1")

    ax2.set_ylim(min(df_inference['score']), 1.4 * max(df_inference['score']))

Calculate thresholds and anomalies for each host

Take the training data and run it through the model and get the threshold value for each host. Store the threshold values into a dictionary/json object, so that we can put this onto S3. Later, we can think of putting this into Dynamodb and exposing this via an API.

  • For each host, get the dataframe. Pre-process the dataframe and send it to the model. Get the inference dataframe.
  • Using the inference dataframe, calculate the threshold value.
  • Store that threshold value into a dictionary.
thresholds = {}
for host in HOSTS:

    try:
        # get the inference for the host.
        df_inference = get_inference_for_host(host)

        # get the threshold and anomalies for this host
        threshold, anomalies = get_thresholds_anomalies(df_inference)

        # build the dictionary
        thresholds[host] = threshold

        # where to store the anomalies for each host?
        anomalies_host_prefix = 'tastetest/outputs/seal/12345/' + host + '/anomalies.csv'

        # upload the anomalies to S3
        upload_to_s3(anomalies, bucket, anomalies_host_prefix)

    except:
        print("HOST exception: {}".format(host))
HOST exception: ccpborainfprd03
HOST exception: ccpborainfprd04
thresholds
{'VSIA2P1953': 4.756026011526808,
 'VSIA2P1955': 3.801921214479359,
 'VSIA2P3340': 4.351807969455841,
 'VSIA2P3390': 3.790421553245209,
 'VSIE4P1317': 3.3437116855242617,
 'VSIE4P1505': 3.5291895893091634,
 'VSIE4P3038': 3.782492128770304,
 'VSIE4P3039': 3.2887344591462018,
 'VSIN20P3651': 3.038342197372302,
 'VSIN20P3739': 4.668041098225739,
 'VSIN20P4765': 4.657142997004395,
 'VSIN20P4771': 4.684034095094461,
 'psin10p026': 7.226442467653326,
 'psin10p027': 6.583427229705553,
 'psin9p182': 6.804781385240922,
 'psin9p183': 5.122724399992083,
 'mrpborainfprd03': 3.2778770221976288,
 'mrpborainfprd04': 1.5911666823233295,
 'psia0p477': 5.164329147815787,
 'psia0p478': 5.786836872599331,
 'psia0p473': 4.860958554954675,
 'psia0p474': 4.91017786622281}

Upload the json to S3

import json
s3.Object(bucket, 'tastetest/outputs/seal/12345/thresholds.json').put(Body=(bytes(json.dumps(thresholds).encode('UTF-8'))))
{'ResponseMetadata': {'RequestId': '1AKA9Q040E063CKN',
  'HostId': '+DahTCkCCxPNKGSEomUOBNpSHcCnNiW5fhl6typi8DH96/P28qTsiRcgt/gjLmqd30QVxfVFrmM=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '+DahTCkCCxPNKGSEomUOBNpSHcCnNiW5fhl6typi8DH96/P28qTsiRcgt/gjLmqd30QVxfVFrmM=',
   'x-amz-request-id': '1AKA9Q040E063CKN',
   'date': 'Sun, 08 Aug 2021 00:02:22 GMT',
   'x-amz-version-id': '_Ro.zVbNpu3pvB_JvJ.lU9HBjN_A_TBN',
   'x-amz-server-side-encryption': 'aws:kms',
   'x-amz-server-side-encryption-aws-kms-key-id': 'arn:aws:kms:us-east-1:888888888888:key/f43c82fd-a0ff-4db7-bb5e-cd8ee4e8bdde',
   'etag': '"6af7060df7ba3b4d8dca0fc930710c65"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"6af7060df7ba3b4d8dca0fc930710c65"',
 'ServerSideEncryption': 'aws:kms',
 'VersionId': '_Ro.zVbNpu3pvB_JvJ.lU9HBjN_A_TBN',
 'SSEKMSKeyId': 'arn:aws:kms:us-east-1:888888888888:key/f43c82fd-a0ff-4db7-bb5e-cd8ee4e8bdde'}

Part 6: Visualizing the outputs

host='VSIE4P3039'
df_inference = get_inference_for_host(host)
threshold, anomalies = get_thresholds_anomalies(df_inference)
plot_anomalies(df_inference, anomalies)

png

host='VSIE4P3039'
df_inference = get_inference_for_host(host)
threshold, anomalies = get_thresholds_anomalies(df_inference)
plot_anomalies(df_inference, anomalies)

png

host='VSIA2P3390'
df_inference = get_inference_for_host(host)
threshold, anomalies = get_thresholds_anomalies(df_inference)
plot_anomalies(df_inference, anomalies)

png

df_inference
iowait usageAverage guestActivePct hostUsagePct hour dayofweek dummy score
collection_time
2021-05-15 00:01:39 4.133333 6.844667 7.399993 7.390000 0 5 0 1.619116
2021-05-15 00:06:39 4.200000 6.876667 6.999993 6.990000 0 5 0 1.595702
2021-05-15 00:11:39 5.466667 7.030000 7.399993 7.390000 0 5 0 1.673916
2021-05-15 00:16:39 2.800000 7.074000 8.133326 8.123333 0 5 0 1.589001
2021-05-15 00:21:39 4.466667 7.006667 5.933328 5.923333 0 5 0 1.611375
... ... ... ... ... ... ... ... ...
2021-08-03 23:20:54 0.933333 9.858000 10.066657 10.056666 23 1 0 1.775754
2021-08-03 23:30:55 0.000000 9.876000 7.666659 7.656667 23 1 0 1.610336
2021-08-03 23:40:59 0.000000 10.100000 7.933326 7.923333 23 1 0 1.626396
2021-08-03 23:46:00 0.666667 10.190000 9.066658 9.056666 23 1 0 1.686963
2021-08-03 23:51:00 186.333328 10.548000 7.999992 7.990000 23 1 0 4.684175

20167 rows × 8 columns

%%time
host='VSIE4P1317'
df_inference = get_inference_for_host(host)
threshold, anomalies = get_thresholds_anomalies(df_inference)
plot_anomalies(df_inference, anomalies)
CPU times: user 131 ms, sys: 0 ns, total: 131 ms
Wall time: 2.26 s

png

%%time
host='VSIA2P1953'
df_inference = get_inference_for_host(host)
threshold, anomalies = get_thresholds_anomalies(df_inference)
plot_anomalies(df_inference, anomalies)
CPU times: user 145 ms, sys: 0 ns, total: 145 ms
Wall time: 2.45 s

png

%%time
host='LSPBVSYBPRD23'
df_inference = get_inference_for_host(host)
threshold, anomalies = get_thresholds_anomalies(df_inference)
plot_anomalies(df_inference, anomalies)
CPU times: user 133 ms, sys: 1.73 ms, total: 135 ms
Wall time: 2.44 s

png

score_mean = df_inference["score"].mean()
score_std = df_inference["score"].std()
score_cutoff = score_mean + 3 * score_std

anomalies = df_inference[df_inference["score"] > score_cutoff]
df_array = df_host.to_numpy()
df_array.shape
(4640, 5)
results = predictor.predict(data=df_array,
                            initial_args={"ContentType": "text/csv", "Accept": "application/json"},
                            target_model="CCPBVWEBPRD263.tar.gz")

scores = [datum["score"] for datum in results["scores"]]
df_inference = df_host.copy()

# create a column
df_inference["score"] = pd.Series(scores, index=df_inference.index)
print(df_inference.shape)
df_inference.head()
(4640, 6)
CPU (Avg) Memory (Physical) Processes Memory (Virtual) Memory (Swap) score
collection_hour
2021-01-01 00:00:00 5.62 51.00 0.07 26.0 0.0 4.744156
2021-01-01 01:00:00 5.43 51.00 0.05 26.0 0.0 4.729027
2021-01-01 02:00:00 5.31 51.00 0.02 26.0 0.0 4.728208
2021-01-01 03:00:00 5.91 51.00 0.04 26.0 0.0 4.772424
2021-01-01 04:00:00 5.15 51.25 0.14 26.0 0.0 4.713383
df_inference['score'].plot(figsize=(20, 10))
<AxesSubplot:xlabel='collection_hour'>

png

df_inference.sort_values(by='score', ascending=False)
CPU (Avg) Memory (Physical) Processes Memory (Virtual) Memory (Swap) score
collection_hour
2021-02-20 10:00:00 4.89 27.00 0.01 15.00 0.0 5.238596
2021-02-20 09:00:00 4.87 27.00 0.02 15.00 0.0 5.238503
2021-02-20 11:00:00 5.16 28.08 0.04 15.75 0.0 5.222376
2021-02-20 13:00:00 4.88 28.89 0.07 15.89 0.0 5.207549
2021-02-20 12:00:00 4.83 28.91 0.01 15.91 0.0 5.207468
... ... ... ... ... ... ...
2021-01-31 08:00:00 4.93 59.00 0.05 32.00 0.0 4.432263
2021-04-22 02:00:00 4.86 59.00 0.07 32.00 0.0 4.431804
2021-04-26 22:00:00 4.88 60.00 0.13 32.92 0.0 4.428965
2021-02-04 04:00:00 5.05 60.00 0.07 33.00 0.0 4.428797
2021-02-04 02:00:00 4.84 60.00 0.05 33.00 0.0 4.425693

4640 rows × 6 columns

df_inference['score'].mean() + 3 * df_inference['score'].std()
5.142651432144164
score_mean = df_inference["score"].mean()
score_std = df_inference["score"].std()
score_cutoff = score_mean + 3 * score_std

anomalies = df_inference[df_inference["score"] > score_cutoff]
df_inference.columns
Index(['CPU (Avg)', 'Memory (Physical)', 'Processes', 'Memory (Virtual)',
       'Memory (Swap)', 'score'],
      dtype='object')

Plot anomalies

fig, ax1 = plt.subplots(figsize=(20,10))
ax2 = ax1.twinx()

ax1.plot(df_inference[['CPU (Avg)', 'Memory (Physical)', 'Processes', 'Memory (Virtual)', 'Memory (Swap)']],
         color="C0", alpha=0.8)
ax2.plot(anomalies.index, anomalies.score, "ko", color='C1')

ax1.grid(which="major", axis="both")

ax1.set_ylabel("Infra metrics", color="C0", alpha=0.9)
ax2.set_ylabel("Anomaly Score", color="C1")

ax1.tick_params("y", colors="C0")
ax2.tick_params("y", colors="C1")

#ax1.set_ylim(0, max(df_inference['roundtrip_time']))
ax2.set_ylim(min(scores), 1.4 * max(scores))
(4.425692623, 7.3340346167199995)

png

Part 7: Using Boto APIs to invoke the endpoint from outside the notebook environment.

While developing interactively within a Jupyter notebook, since .deploy() returns a RealTimePredictor it is a more seamless experience to start invoking your endpoint using the SageMaker SDK. You have more fine grained control over the serialization and deserialization protocols to shape your request and response payloads to/from the endpoint.

This is great for iterative experimentation within a notebook. Furthermore, should you have an application that has access to the SageMaker SDK, you can always import RealTimePredictor and attach it to an existing endpoint - this allows you to stick to using the high level SDK if preferable.

Additional documentation on RealTimePredictor can be found here.

The lower level Boto3 SDK may be preferable if you are attempting to invoke the endpoint as a part of a broader architecture.

Imagine an API gateway frontend that uses a Lambda Proxy in order to transform request payloads before hitting a SageMaker Endpoint - in this example, Lambda does not have access to the SageMaker Python SDK, and as such, Boto3 can still allow you to interact with your endpoint and serve inference requests.

Boto3 allows for quick injection of ML intelligence via SageMaker Endpoints into existing applications with minimal/no refactoring to existing code.

Boto3 will submit your requests as a binary payload, while still allowing you to supply your desired Content-Type and Accept headers with serialization being handled by the inference container in the SageMaker Endpoint.

Additional documentation on .invoke_endpoint() can be found here.

Goal: Create a function that takes the hostname as the input and constructs the model-name. For the sake of testing, get the first 10 rows of each host’s dataframe and send that as body to the endpoint. Finally, get the response in the format that you will insert into DynamoDB.

def invoke_sagemaker_endpoint(endpoint_name, host):

    # model names
    model_name = host + ".tar.gz"

    # get sagemaker obj
    runtime_sm_client = boto3.client(service_name="sagemaker-runtime")

    # get first 10 rows of each host
    df_host = get_dataframe_for_host(host)

    # append another column as it is a pre-processing step
    if df_host.shape[1] == 6:
        df_host['dummy'] = 0

    # take the first 10 values for testing
    data = df_host.head().values.tolist()
    str_data = [str(item).replace('[', '').replace(']', '') for item in data]
    body = '\n'.join(str(item) for item in str_data)


    response = runtime_sm_client.invoke_endpoint(EndpointName=endpoint_name,
                                             ContentType="text/csv",
                                             TargetModel=model_name,
                                             Body=body)

    res = json.loads(response['Body'].read().decode('utf-8'))

    return res

invoke_sagemaker_endpoint(endpoint_name, HOSTS[-1])
{'scores': [{'score': 4.1307051177},
  {'score': 4.1375795903},
  {'score': 4.1260104824},
  {'score': 4.1337678906},
  {'score': 4.1367306677}]}

Shown below is the format in which we need to send data to RCF inference endpoint.

data = df_host.head().values.tolist()
str_data = [str(item).replace('[', '').replace(']', '') for item in data]
body = '\n'.join(str(item) for item in str_data)
body
'4.133333333345945, 6.844666666666668, 7.399992942810059, 7.39, 0.0, 5.0\n4.1999999999970905, 6.876666666666666, 6.999993324279785, 6.99, 0.0, 5.0\n5.466666666659876, 7.03, 7.399992942810059, 7.39, 0.0, 5.0\n2.8000000000029104, 7.074, 8.133325576782228, 8.123333333333333, 0.0, 5.0\n4.466666666645324, 7.006666666666666, 5.933327674865723, 5.923333333333334, 0.0, 5.0'

Part 8: Delete the deployed model

Before we go ahead with deployment, we need to make sure that we delete any existing endpoints which have the same name. Otherwise, we will get an error that we are trying to deploy to an existing running endpoint.

ClientError: An error occurred (ValidationException) when calling the CreateEndpoint operation: Cannot create already existing endpoint "arn:aws:sagemaker:us-east-1:1372234455:endpoint/morcom-tradevolume".

Using boto3 we can list_endpoints() and pick the endpoint we want to delete. Or, if you already know the endpoint_name, you can pick that one out. I already know what the endpoint_name is, so I will use it.

client = boto3.client('sagemaker')
client.list_endpoints()
{'Endpoints': [{'EndpointName': 'mme-xgboost-housing',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/mme-xgboost-housing',
   'CreationTime': datetime.datetime(2021, 7, 20, 16, 29, 33, 448000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 20, 16, 39, 55, 222000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'forecast-youinvapi-6-5-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/forecast-youinvapi-6-5-tradevolume',
   'CreationTime': datetime.datetime(2021, 7, 12, 20, 17, 40, 512000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 24, 7, 6, 27, 26000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'youinvapi-lb-rtt',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/youinvapi-lb-rtt',
   'CreationTime': datetime.datetime(2021, 7, 9, 16, 56, 32, 586000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 23, 21, 6, 31, 131000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'forecast-youinvapi-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/forecast-youinvapi-tradevolume',
   'CreationTime': datetime.datetime(2021, 7, 7, 16, 7, 56, 651000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 24, 10, 54, 4, 83000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'forecast-3hr-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/forecast-3hr-tradevolume',
   'CreationTime': datetime.datetime(2021, 7, 6, 18, 56, 19, 526000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 24, 1, 37, 26, 855000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'overall-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/overall-tradevolume',
   'CreationTime': datetime.datetime(2021, 7, 2, 20, 50, 40, 324000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 24, 6, 31, 25, 399000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'whipanya-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/whipanya-tradevolume',
   'CreationTime': datetime.datetime(2021, 6, 20, 15, 18, 1, 297000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 24, 5, 46, 34, 786000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'kcbnsapa-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/kcbnsapa-tradevolume',
   'CreationTime': datetime.datetime(2021, 6, 20, 14, 25, 49, 65000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 24, 4, 1, 25, 86000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'forecast-6-5hour-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/forecast-6-5hour-tradevolume',
   'CreationTime': datetime.datetime(2021, 6, 17, 19, 45, 8, 375000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 24, 3, 21, 26, 158000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'forecast-6-5hour-v2-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:888888888888:endpoint/forecast-6-5hour-v2-tradevolume',
   'CreationTime': datetime.datetime(2021, 6, 17, 14, 41, 7, 115000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 7, 24, 0, 21, 43, 806000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'}],
 'NextToken': 'cIws2QhTXUIa8bi8XNNmOrX7NtEcZ5T9ZLF6dgJ5dTQZno7NF9zPWyjMjvftk21E7+6zMvdKf5zowPPzEZ4uu+DvdPlBkIFMT2cAkN+I5F1DdwxZ1kWhPheCvkNcXJH2tMKeyZ4tUCQLk24+Su34NdDXYAZ5NhdUZyQLSIQHvxFBIEDdURknt+zxHp6qY4SID/9k425tflI5ul4OQJ/h9qtiG6eByA3HNnMsePdz1xunQR1iNQszBCsDW1jaSUFxJss0Usx0nmHOtEDmSrnzY2+3DztI+AVtmIBCDDCOtCcJOvYh6jwsVkGsnYgxDH8cS3fAidp7zzAo8h3+Ml744jWzj1f1ivr6LB6IpQDcEjNeNQFe4dOdlSEN9da1C69Y6Lh3KrSqEzpZSRGBJ3OrQN2IAiEr1EBmxKOuk4KmuFESg3t8z0jSuAXhUgR8JBr44DiaZVaRGM3j0xFKfDOnOJcBck0Yw97pgu1To8ysXD2lmf50PHkcn6ZSp1T8pd3sECFE/+YnpI7IZinJ+UDfRdzuwlU=',
 'ResponseMetadata': {'RequestId': '1233e809-47e9-4f06-ba40-cdb8794c77b3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1233e809-47e9-4f06-ba40-cdb8794c77b3',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2892',
   'date': 'Tue, 27 Jul 2021 12:31:11 GMT'},
  'RetryAttempts': 0}}
endpoint_name = 'mme-xgboost-housing'

try:
    response = client.describe_endpoint_config(EndpointConfigName=endpoint_name)
    # extract the model_name and endpoint_name from the response obj
    model_name = response['ProductionVariants'][0]['ModelName']

    print("Deleting model: {}".format(model_name))
    # delete the model
    client.delete_model(ModelName=model_name)

    print("Deleting endpoint: {}".format(endpoint_name))
    # delete the endpoint
    client.delete_endpoint(EndpointName=endpoint_name)

    print("Deleting endpoint config: {}".format(endpoint_name))
    # delete the endpoint configuration
    client.delete_endpoint_config(EndpointConfigName=endpoint_name)
except:
    print('Endpoint not found')
Deleting model: mme-xgboost-housing
Deleting endpoint: mme-xgboost-housing
Deleting endpoint config: mme-xgboost-housing

Appendix

Updating a model

To update a model, you would follow the same approach as above and add it as a new model. For example, if you have retrained the NewYork_NY.tar.gz model and wanted to start invoking it, you would upload the updated model artifacts behind the S3 prefix with a new name such as NewYork_NY_v2.tar.gz, and then change the target_model field to invoke NewYork_NY_v2.tar.gz instead of NewYork_NY.tar.gz. You do not want to overwrite the model artifacts in Amazon S3, because the old version of the model might still be loaded in the containers or on the storage volume of the instances on the endpoint. Invocations to the new model could then invoke the old version of the model.

Alternatively, you could stop the endpoint and re-deploy a fresh set of models.

Tags:

Updated: