Anomaly Detection using Airflow and Sagemaker RandomCutForest

26 minute read

Anomaly Detection Solution using Airflow and SageMaker

Problem Description: A team tracks the trade volumes in a Grafana dashboard. They want to build an anomaly detection solution to identify sudden spikes/irregularities in trade volumes, so that they can trigger additional actions based on the identification of these anomalous data points. The problem is that, the data they use for the dashboard, is stored inside an Oracle database. Our task is to build a solution, that continuously & incrementally pulls data out of the database and sends this to a ML Model to detect anomalies, and inserts those anomalies back into the database, that will then be displayed in the dashboard. A sample output with anomalies in orange is shown in the figure:

png

In this post, I will describe how to solution this problem using Apache Airflow and AWS Sagemaker.

An Airflow data pipeline oracle-to-sagemaker-dag extracts data from an oracle database using an API, pre-processes the data and sends it to a SageMaker ML endpoint to get the inferences, writes out the results to csv files in 30 minute increments starting from 2021-01-01 and finally inserts the anomaly scores back into the oracle database. Shown above is the snapshot of the data pipeline and its execution. The red dots represent failure to find data during that time slot.

Implementation

Airflow is at the heart of this solution, which directs the flow of data from Oracle -> SageMaker ML model -> Outputs to S3/Oracle -> Dashboard. Described below are the sequence of steps the data moves through and the transformations it undergoes before ending up in the Dashboard.

Step 1: Fetch the data from the API

Airflow Operator incrementally makes a GET request to the API to fetch the trade volumes from the Oracle DB for a particular group and for a given time period bounded by start_date and end_date. The interval_minutes here informs the API to return aggregated 5 minute trade volumes.

For example: http://<api-url>/api/get/volume?group_name=GROUP&start_date=1617282000000&end_date=1617283800000&interval_minutes=5 - In the URL, GROUP is the trade group name, start_date and end_date are Unix timestamps (epochs in UTC/GMT time).

NOTE: As Airflow itself is running in UTC time zone (that’s how I have set it up), the execution_date of the dag gets converted to epoch time in UTC automatically. However, when displaying inside Grafana dashboard, the epoch time needs to be converted to local time, ex: EST/EDT. This is discussed in more detail later in the post.

Step 2: Preprocess and send the data to Flask API

The Airflow operator gets the response from API (sample response is shown below). It then extracts the volume data from the response and sends this to the Flask API running on EKS using a POST request. Sample API response:

  {'end_date': 1616445000000,
 'group_name': 'GROUP',
 'interval_minutes': 5,
 'results': [{'volume': 20, 'time': 1616443500000, 'groupname': 'GROUP'},
  {'volume': 23, 'time': 1616444100000, 'groupname': 'GROUP'},
  {'volume': 27, 'time': 1616443800000, 'groupname': 'GROUP'},
  {'volume': 54, 'time': 1616443200000, 'groupname': 'GROUP'},
  {'volume': 15, 'time': 1616444700000, 'groupname': 'GROUP'},
  {'volume': 26, 'time': 1616444400000, 'groupname': 'GROUP'}],
 'start_date': 1616443200000}

The following snippet shows how to make a request to api and process the response.

# make a request to trademon api
trade_volume_response = requests.get(source_endpoint)
# get the response in json format
trade_volume_data = trade_volume_response.json()
# make a dataframe obj
df = pd.DataFrame(trade_volume_data['results'])
# convert the epoch time to datetime object
df['timestamp'] = df['time'].apply(get_datetime_from_epoch)
df.set_index('timestamp', inplace=True, drop=True)
# the trademon api, doesn't send the results in order, so we need to sort by time.
df.sort_index(inplace=True)
df.reset_index(inplace=True)
# Finally convert the sorted volume to a list
data = df['volume'].tolist()

Next, the Airflow Operator invokes the EKS Flask API. A sample POST request to EKS Flask API with the list of values in data would like this:

curl -H "Content-Type: application/json" --request POST --data '[60.7, 70.3, 35]' https://<api-url>/api/v1/anomaly/tradevolume/GROUP

The python equivalent inside the Airflow Operator would look like this:

# invoke the eks endpoint
self.log.info("ML Endpoint is {}".format(self.ml_endpoint))
self.log.info("Send the data to EKS endpoint")
eks_response = requests.post(self.ml_endpoint, json=data)
df_response = pd.DataFrame(eks_response.json())

Step 3: Flask API forwards the request to SageMaker Endpoint

The role of Flask API, is to receive the data and send it to SageMaker endpoint.

OK, why? Why can’t your Airflow Operator directly invoke the sagemaker endpoint? Why do we need this additonal Flask API in between?

The answer is very specific to the environment in which I am working. We can’t make a POST request with some data to AWS endpoint directly from on-prem. That would violate some security laws. Thus, we need to have an EKS pod running inside the same AWS account as SageMaker, with a InvokeEndpoint role attached to the pod. Sounds very complex, but this is specific to the environment.

Inside the EKS pod, an example of how the Flask API endpoint would look like is shown here:

@app.route("/api/v1/anomaly/tradevolume/<label>", methods=['POST'])
def get_tradevolume_scores(label):
    request_data = request.get_json()
    body = '\n'.join(str(item) for item in request_data)
    endpoint_name = label + '-tradevolume'
    try:
        client = boto3.client('runtime.sagemaker', region_name='us-east-1')
        response = client.invoke_endpoint(EndpointName = endpoint_name, Body = body,
                                          ContentType = 'text/csv')
        # response['Body'].read() returns a byte representation of the string
        # decode the string from bytes to utf-8
        # use json.loads to convert that string to a python dictionary
        res = json.loads(response['Body'].read().decode('utf-8'))
        logging.info("endpoint response is {}".format(res))
    except Exception as e:
        logging.error("endpoint failed: %s", str(e))
        res = {}

    return jsonify(res)

The main thing to note here is how we are invoking the SageMaker Endpoint. We start by acquiring a boto3.client object and use that to invoke_endpoint().

NOTE: The EndpointName here must match the deployed endpoint_name. While deploying an ML model, you can specify a constant name. This ensures that when we retrain and redeploy the SageMaker endpoint, we don’t need to update our Flask API.

Step 4: Process the response from ML Endpoint

Now, we come back to the Airflow Operator which will get the response from Flask API and merges the anomaly score to the original dataframe, and stores the result to S3 as a csv file. See next section for the format in which it is stored.

A snippted of this Airflow Operator logic is shown here:

# merge the two dataframes
df['scores'] = pd.DataFrame(df_response['scores'].tolist())['score']

# write the results to S3
self.log.info("S3 file location is {}".format(self.aws_s3_filename))
upload_to_s3(df, self.aws_s3_bucket, self.aws_s3_filename)

Step 5: POST the data back to API

The final step in the Airlow Operator is to POST the data back to the TradeMon API, which then will store the anomaly scores into the database.

Step 6: Display the results in Grafana Dashboard

The Grafana dashboard, will reference the table where we store the original volumes and anomaly scores, and plot the anomalous data points as events on the time series.

That concludes the inference pipeline solution. Next, we will look at how to continuously re-train and deploy the model using EKS cronjobs.

Build and upload the training dataset to S3

In the implementation section, we saw that in Step 4, we write out a dataframe in the form of a csv file for every Airflow Dag execution. The outputs are stored as shown below.

In order to re-train the model, say, every week, we need to combine all these individual files and build out the training dataset. We could use a simple EKS cronjob to invoke the training script, but that is not the focus of this post. I will be using notebook environment to train.

How to combine multiple S3 files into a dataframe?

Step4 of the implementation leaves us with multiple csv files located on S3 as shown here:

$ aws s3 ls s3://<bucket>/<prefix>/
2021-03-30 14:46:53        375 GROUP.2021-01-01T00:00:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T00:30:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T01:00:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T01:30:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T02:00:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T02:30:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T03:00:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T03:30:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T04:00:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T11:30:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T12:00:00+00:00.csv
2021-03-30 14:46:53        375 GROUP.2021-01-01T12:30:00+00:00.csv

Our goal is to build a dataset out these individual files, which can then be used to re-train the model.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import boto3
from io import StringIO
from datetime import datetime

bucket = 's3-bucket'
s3 = boto3.resource('s3')

bucket_obj = s3.Bucket(bucket)
file_objs = list(bucket_obj.objects.filter(Prefix='tradevolumes'))
print(len(file_objs))
3097
%%time
df_list = []
for obj in file_objs:
    df = pd.read_csv(StringIO(obj.get()['Body'].read().decode('utf-8')), index_col='timestamp', parse_dates=True)
    df_list.append(df)
dfs = pd.concat(df_list)
print(dfs.shape)
(18515, 4)
CPU times: user 20.3 s, sys: 381 ms, total: 20.6 s
Wall time: 4min 13s
dfs = dfs.drop(['groupname', 'scores'], axis=1)
dfs
volume time
localtime
2020-12-31 18:30:00 0 1609457400000
2020-12-31 18:35:00 0 1609457700000
2020-12-31 18:40:00 2 1609458000000
2020-12-31 18:45:00 0 1609458300000
2020-12-31 18:50:00 6 1609458600000
... ... ...
2021-04-01 10:35:00 79 1617287700000
2021-04-01 10:40:00 45 1617288000000
2021-04-01 10:45:00 52 1617288300000
2021-04-01 10:50:00 48 1617288600000
2021-04-01 10:55:00 37 1617288900000

18515 rows × 2 columns

It is worthwhile to confirm that the timestamps are in UTC, since inside Airflow Operator, I am using the below method to get the timestamp. The next section describes how to confirm that the time is indeed in UTC.

def get_datetime_from_epoch(epoch):
    epoch = int(epoch/1000)
    # should we use utcfromtimestamp instead?
    # No, they both are the same in this case.
    return datetime.fromtimestamp(epoch).strftime('%Y-%m-%d %H:%M:%S')

Converting Epoch to local time

What is epoch time?: The Unix epoch (or Unix time or POSIX time or Unix timestamp) is the number of seconds that have elapsed since January 1, 1970 (midnight UTC/GMT)

It’s worthwhile to remember that a timestamp has no associated timezone information.

Unix time is unambiguously UTC. But some people like to store local time in Unix-style timestamps (which is annoying). So, you must check against a known reference to be sure.

One way to check if the epoch is stored in UTC or local time is by calculating the offset. If the offset is 0, then the epoch represents a UTC time.

datetime.fromtimestamp(1617288900000/1000) - datetime.utcfromtimestamp(1617288900000/1000)
datetime.timedelta(0)
from datetime import datetime
from dateutil import tz

def get_datetime_from_epoch(epoch, from_zone=tz.gettz('UTC'), to_zone=tz.gettz('America/New_York')):
    # remove milliseconds
    epoch = int(epoch/1000)
    # make the datetime object tz aware
    utc = datetime.fromtimestamp(epoch).replace(tzinfo=from_zone)
    # convert utc to desired tz
    local = utc.astimezone(to_zone)
    # format datetime as str
    return local.strftime('%Y-%m-%d %H:%M:%S')

dfs['localtime'] = dfs['time'].apply(get_datetime_from_epoch)
dfs['localtime'] = pd.to_datetime(dfs['localtime'])
dfs.set_index('localtime', inplace=True, drop=True)
dfs['volume'].plot(figsize=(20, 10))

png

dfs['2021-03-31 16:00:00': '2021-03-31 17:00:00']['volume'].plot(figsize=(20,10))

png

dfs['2021-03-31 16:00:00': '2021-03-31 17:00:00']
volume time
localtime
2021-03-31 16:00:00 44 1617220800000
2021-03-31 16:05:00 46 1617221100000
2021-03-31 16:10:00 207 1617221400000
2021-03-31 16:15:00 17 1617221700000
2021-03-31 16:20:00 229 1617222000000
2021-03-31 16:25:00 29 1617222300000
2021-03-31 16:30:00 29 1617222600000
2021-03-31 16:35:00 27 1617222900000
2021-03-31 16:40:00 16 1617223200000
2021-03-31 16:45:00 13 1617223500000
2021-03-31 16:50:00 13 1617223800000
2021-03-31 16:55:00 21 1617224100000
2021-03-31 17:00:00 20 1617224400000

What does the volume represent here?: The volume from 16:10 and 16:15 is 207 - the first spike.

Store the training dataset back to S3

We will store the combined dataset back into S3 for later use.

def upload_to_s3(content, s3_bucket, s3_key):

    print("Uploading file: " + s3_key + " to S3 bucket: " + s3_bucket)

    s3 = boto3.resource('s3')

    # stream data to s3
    csv_buffer = StringIO()
    content.to_csv(csv_buffer, index=True)
    s3.Object(s3_bucket, s3_key).put(Body=csv_buffer.getvalue())
upload_to_s3(dfs, bucket, 'rcf-training-data/train.csv')
Uploading file: rcf-training-data/train.csv to S3 bucket: s3-bucket

Read back the file from S3

Let’s validate by reading back the saved file from S3.

def download_from_s3(s3_bucket, s3_key):
    print("Downloading file " + s3_key + " from S3 bucket " + s3_bucket)

    s3 = boto3.resource('s3')
    obj = s3.Object(s3_bucket, s3_key)
    content = obj.get()['Body'].read()
    df = pd.read_csv(StringIO(content.decode('utf-8')), index_col='localtime', parse_dates=True)
    print(df.shape)
    return df
df_train = download_from_s3(bucket, 'rcf-training-data/train.csv')
Downloading file rcf-training-data/train.csv from S3 bucket s3-bucket
(18515, 4)
df_train.head()
volume time
localtime
2020-12-31 18:30:00 0 1609457400000
2020-12-31 18:35:00 0 1609457700000
2020-12-31 18:40:00 2 1609458000000
2020-12-31 18:45:00 0 1609458300000
2020-12-31 18:50:00 6 1609458600000
... ... ...
2021-04-01 10:35:00 79 1617287700000
2021-04-01 10:40:00 45 1617288000000
2021-04-01 10:45:00 52 1617288300000
2021-04-01 10:50:00 48 1617288600000
2021-04-01 10:55:00 37 1617288900000

18515 rows × 2 columns

Explore the data

Plot the training data, in this case, the volume of trades made every 5 minutes over a period of 1st Jan 2021 to 1st April 2021. As we can see, there are some unusual spikes in the data. Our goal is to train a Random Cut Forest to give out a score for each of the data points based on how anomalous they are. The higher the score, the more anomalous the data point.

df_train[['volume']].plot(figsize=(20,10))

png

Training a Random Cut Forest using SageMaker

The Random Cut Forest Algorithm accepts data in RecordIO Protobuf format. The SageMaker Python API provides helper functions for easily converting your data into this format.

Step 1: Prepare the data and upload it to S3

In the cell below, I define a utility function that will split the training data set into parts, convert them to RecordIO format, and upload those parts to S3.

import sagemaker.amazon.common as smac
import os

def split_convert_upload(sparray, bucket, prefix, fname_template='data_part{}.pbr', n_parts=2):
    import io
    import boto3
    import sagemaker.amazon.common as smac

    chunk_size = sparray.shape[0]// n_parts
    for i in range(n_parts):

        # Calculate start and end indices
        start = i*chunk_size
        end = (i+1)*chunk_size
        if i+1 == n_parts:
            end = sparray.shape[0]

        # Convert to record protobuf
        buf = io.BytesIO()
        #smac.write_spmatrix_to_sparse_tensor(array=sparray[start:end], file=buf, labels=None)
        smac.write_numpy_to_dense_tensor(array=sparray[start:end], file=buf, labels=None)
        buf.seek(0)

        # Upload to s3 location specified by bucket and prefix
        fname = os.path.join(prefix, fname_template.format(i))
        boto3.resource('s3').Bucket(bucket).Object(fname).upload_fileobj(buf)
        print('Uploaded data to s3://{}'.format(os.path.join(bucket, fname)))

Define the location where you want the data to be uploaded.

prefix = 'rcf-tradevolume'

train_prefix = os.path.join(prefix, 'train')
output_prefix = os.path.join(prefix, 'output')

s3_train_data = os.path.join('s3://', bucket, train_prefix)
output_path = os.path.join('s3://', bucket, output_prefix)
print('Training set location', s3_train_data)
print('Trained model will be saved at', output_path)
Training set location s3://s3-bucket/rcf-tradevolume/train
Trained model will be saved at s3://s3-bucket/rcf-tradevolume/output

Next, convert the volume data to an array and upload it to S3.

df_train_volumes = df_train[['volume']]
trade_volume_array = df_train_volumes.to_numpy()
print(trade_volume_array.shape)
split_convert_upload(trade_volume_array, bucket=bucket, prefix=train_prefix, fname_template='train_part{}.pbr', n_parts=4)
(18515, 1)
Uploaded data to s3://s3-bucket/rcf-tradevolume/train/train_part0.pbr
Uploaded data to s3://s3-bucket/rcf-tradevolume/train/train_part1.pbr
Uploaded data to s3://s3-bucket/rcf-tradevolume/train/train_part2.pbr
Uploaded data to s3://s3-bucket/rcf-tradevolume/train/train_part3.pbr

Step 2: Prepare your Notebook environment to train using SageMaker

We have created a training data (.pbr files) set and uploaded it to S3. Next, we configure a SageMaker training job to use the Random Cut Forest (RCF) algorithm on said training data.

The first step is to specify the location of the Docker image containing the SageMaker Random Cut Forest algorithm. In order to minimize communication latency, we provide containers for each AWS region in which SageMaker is available. The code below automatically chooses an algorithm container based on the current region; that is, the region in which this notebook is run.

RCF is a SageMaker algorithm.

Sagemaker is automatically installed on the notebook instance. Simply import and set up the session as usual. This is the same setup you will see on any Sagemaker documentation or tutorial. get_execution_role from the sagemaker module is an optional command.

  • get_pysdk_training_params(): In this cell, we import a module that helps to correctly set up training algorithms on Sagemaker. This module is necessary to train successfully with Sagemaker in the notebook environment that I am working in. You won’t need this if you are using SageMaker notebook directly from your AWS account.

get_pysdk_training_params() adds the correct security configurations required by cyber, such as the role, KMS key, security group, and subnet. These values will be visible in the output.

NOTE: colab is a environment specific libary that I have. It is very much the same as SageMaker library, except it has some wrappers for security etc.

# SageMaker setup
import sagemaker
import colab.utils.sagemaker
# All training algorithm parameters must be wrapped with this module
from colab.utils.sagemaker import get_pysdk_training_params
# this is deprecated in future versions. Need to find an alternative
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.session import s3_input

Step 3: Hyperparameters and job definition and training

  • Obtain a sagemaker session object
  • Set the default bucket.
  • Obtain the container image for Random Cut Forest.
  • Set the input channel and mention to use a distributed training.
  • Specify the hyperparameters in the dict.
  • Create the estimator object.
  • Finally fit the estimator to the training data identified in the input channel.

Hyperparameters: Particular to a SageMaker RCF training job are the following hyperparameters:

  • num_samples_per_tree - the number randomly sampled data points sent to each tree. As a general rule, 1/num_samples_per_tree should approximate the the estimated ratio of anomalies to normal points in the dataset.
  • num_trees - the number of trees to create in the forest. Each tree learns a separate model from different samples of data. The full forest model uses the mean predicted anomaly score from each constituent tree.
  • feature_dim - the dimension of each data point.

In addition to these RCF model hyperparameters, we provide additional parameters defining things like the EC2 instance type on which training will run, the S3 bucket containing the data, and the AWS access role. Note that,

  • Recommended instance type: ml.m4, ml.c4, or ml.c5

Current limitations:

  • The RCF algorithm does not take advantage of GPU hardware.
sagemaker_session = sagemaker.Session()

# Set Sagemaker's default bucket to correct S3 bucket
sagemaker_session._default_bucket = colab.utils.sagemaker.bucket
print(sagemaker_session._default_bucket)

# get the container image
container = get_image_uri(boto3.Session().region_name, 'randomcutforest')

# set the input channel
s3_input_train = s3_input(s3_train_data, distribution='ShardedByS3Key')

rcf_params = {"train_instance_type":"ml.c4.xlarge",
                  "train_instance_count": 1,
                  "train_volume_size": 30,
                  "output_path":output_path,
                  "hyperparameters":{'num_samples_per_tree': 512, 'num_trees': 50, 'feature_dim': 1}}

estimator = sagemaker.estimator.Estimator(container, **get_pysdk_training_params(rcf_params),
                                      sagemaker_session=sagemaker_session)

estimator.fit({'train': s3_input_train})
s3-bucket
INFO:omniai.utils.sagemaker:Running OmniAI-Sagemaker Parameters Checks for successful launch...
INFO:omniai.utils.sagemaker:Bucket correctly set to: s3-bucket
INFO:omniai.utils.sagemaker:Setting Sagemaker role to: arn:aws:iam::accountid:role/sagemaker/omniai-system-sm-role
INFO:omniai.utils.sagemaker:Setting Sagemaker KMS key to: arn:aws:kms:us-east-1:accountid:key/f43c82fd-a0ff-4db7-bb5e-cd8ee4e8bdde
INFO:omniai.utils.sagemaker:Setting Sagemaker Volume KMS key to: arn:aws:kms:us-east-1:accountid:key/f43c82fd-a0ff-4db7-bb5e-cd8ee4e8bdde
INFO:omniai.utils.sagemaker:Setting Sagemaker Security Group to: sg-08fa1a35165b38c8f
INFO:omniai.utils.sagemaker:Setting Sagemaker Enable Inter Container Traffic Encryption to True
INFO:omniai.utils.sagemaker:Setting Sagemaker Subnets to: ['subnet-03de0646d7a46a997', 'subnet-0e2973e7785ef53e1']
INFO:omniai.utils.sagemaker:Tagging Sagemaker Job with SID: F557190
INFO:omniai.utils.sagemaker:Finished. Please verify that your Sagemaker s3input python object contains the kms key: arn:aws:kms:us-east-1:accountid:key/f43c82fd-a0ff-4db7-bb5e-cd8ee4e8bdde and is reading from bucket: s3-bucket
WARNING:omniai.config:Config section 'Environment' not found in '/opt/omniai/work/instance1/jupyterinstall/bin/config.ini'
INFO:sagemaker:Creating training-job with name: randomcutforest-2021-04-02-15-01-12-944
2021-04-02 15:01:13 Starting - Starting the training job...
2021-04-02 15:01:16 Starting - Launching requested ML instances......
2021-04-02 15:02:30 Starting - Preparing the instances for training......
2021-04-02 15:03:39 Downloading - Downloading input data
2021-04-02 15:03:39 Training - Downloading the training image...
2021-04-02 15:04:11 Training - Training image download completed. Training in progress..Docker entrypoint called with argument(s): train
Running default environment configuration script
/opt/amazon/lib/python3.6/site-packages/sklearn/externals/joblib/func_inspect.py:53: DeprecationWarning: invalid escape sequence \<
  '\<doctest (.*\.rst)\[(.*)\]\>', source_file).groups()
/opt/amazon/lib/python3.6/site-packages/sklearn/externals/joblib/_memory_helpers.py:10: DeprecationWarning: invalid escape sequence \s
  cookie_re = re.compile("coding[:=]\s*([-\w.]+)")
/opt/amazon/lib/python3.6/site-packages/scipy/_lib/_numpy_compat.py:287: DeprecationWarning: invalid escape sequence \p
  """
/opt/amazon/lib/python3.6/site-packages/scipy/_lib/_numpy_compat.py:10: DeprecationWarning: Importing from numpy.testing.nosetester is deprecated, import from numpy.testing instead.
  from numpy.testing.nosetester import import_nose
/opt/amazon/lib/python3.6/site-packages/scipy/stats/morestats.py:12: DeprecationWarning: Importing from numpy.testing.decorators is deprecated, import from numpy.testing instead.
  from numpy.testing.decorators import setastest
/opt/amazon/lib/python3.6/site-packages/pandas/types/dtypes.py:157: DeprecationWarning: invalid escape sequence \[
  _match = re.compile("(datetime64|M8)\[(?P<unit>.+), (?P<tz>.+)\]")
/opt/amazon/lib/python3.6/site-packages/pandas/core/strings.py:600: DeprecationWarning: invalid escape sequence \d
  """
/opt/amazon/lib/python3.6/site-packages/pandas/core/strings.py:689: DeprecationWarning: invalid escape sequence \d
  """
/opt/amazon/lib/python3.6/site-packages/pandas/core/generic.py:2624: DeprecationWarning: invalid escape sequence \*
  """)
/opt/amazon/lib/python3.6/site-packages/pandas/core/window.py:607: DeprecationWarning: invalid escape sequence \*
  \*args and \*\*kwargs are passed to the function""")
/opt/amazon/lib/python3.6/site-packages/pandas/tools/plotting.py:3816: DeprecationWarning: invalid escape sequence \*
  """
/opt/amazon/lib/python3.6/site-packages/pandas/io/parsers.py:119: DeprecationWarning: invalid escape sequence \+
  NaN: `'""" + "'`, `'".join(sorted(_NA_VALUES)) + """'`.
/opt/amazon/lib/python3.6/site-packages/pandas/io/parsers.py:235: DeprecationWarning: invalid escape sequence \s
  engine and will ignore quotes in the data. Regex example: '\\r\\t'"""
/opt/amazon/lib/python3.6/site-packages/pandas/io/parsers.py:717: DeprecationWarning: invalid escape sequence \s
  result['delimiter'] = '\s+'
/opt/amazon/lib/python3.6/site-packages/pandas/io/parsers.py:712: DeprecationWarning: invalid escape sequence \s
  " different from '\s+' are"\
/opt/amazon/lib/python3.6/site-packages/pandas/io/parsers.py:705: DeprecationWarning: invalid escape sequence \s
  if engine == 'c' and sep == '\s+':
/opt/amazon/lib/python3.6/site-packages/pandas/io/clipboard.py:16: DeprecationWarning: invalid escape sequence \s
  """
/opt/amazon/lib/python3.6/site-packages/pandas/io/clipboard.py:49: DeprecationWarning: invalid escape sequence \s
  kwargs['sep'] = '\s+'
/opt/amazon/lib/python3.6/site-packages/pandas/io/pytables.py:1702: DeprecationWarning: invalid escape sequence \d
  m = re.search("values_block_(\d+)", name)
/opt/amazon/lib/python3.6/site-packages/pandas/io/pytables.py:4181: DeprecationWarning: invalid escape sequence \d
  _re_levels = re.compile("^level_\d+$")
/opt/amazon/lib/python3.6/site-packages/pandas/io/sql.py:1456: DeprecationWarning: invalid escape sequence \s
  pat = re.compile('\s+')
/opt/amazon/lib/python3.6/site-packages/pandas/util/testing.py:1573: DeprecationWarning: invalid escape sequence \d
  numeric_tuple = re.sub("[^\d_]_?", "", x).split("_")
/opt/amazon/lib/python3.6/site-packages/pandas/util/testing.py:2205: DeprecationWarning: invalid escape sequence \(
  """
/opt/amazon/lib/python3.6/site-packages/sklearn/metrics/classification.py:349: DeprecationWarning: invalid escape sequence \k
  """
/opt/amazon/lib/python3.6/site-packages/sklearn/metrics/cluster/supervised.py:578: DeprecationWarning: invalid escape sequence \s
  """
[04/02/2021 15:04:15 INFO 139785830434176] Reading default configuration from /opt/amazon/lib/python3.6/site-packages/algorithm/resources/default-conf.json: {'num_samples_per_tree': 256, 'num_trees': 100, 'force_dense': 'true', 'eval_metrics': ['accuracy', 'precision_recall_fscore'], 'epochs': 1, 'mini_batch_size': 1000, '_log_level': 'info', '_kvstore': 'dist_async', '_num_kv_servers': 'auto', '_num_gpus': 'auto', '_tuning_objective_metric': '', '_ftp_port': 8999}
[04/02/2021 15:04:15 INFO 139785830434176] Merging with provided configuration from /opt/ml/input/config/hyperparameters.json: {'num_trees': '50', 'num_samples_per_tree': '512', 'feature_dim': '1'}
[04/02/2021 15:04:15 INFO 139785830434176] Final configuration: {'num_samples_per_tree': '512', 'num_trees': '50', 'force_dense': 'true', 'eval_metrics': ['accuracy', 'precision_recall_fscore'], 'epochs': 1, 'mini_batch_size': 1000, '_log_level': 'info', '_kvstore': 'dist_async', '_num_kv_servers': 'auto', '_num_gpus': 'auto', '_tuning_objective_metric': '', '_ftp_port': 8999, 'feature_dim': '1'}
[04/02/2021 15:04:15 WARNING 139785830434176] Loggers have already been setup.
[04/02/2021 15:04:15 INFO 139785830434176] Launching parameter server for role scheduler
[04/02/2021 15:04:15 INFO 139785830434176] {'ENVROOT': '/opt/amazon', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION': 'cpp', 'HOSTNAME': 'ip-100-69-50-156.ec2.internal', 'TRAINING_JOB_NAME': 'randomcutforest-2021-04-02-15-01-12-944', 'NVIDIA_REQUIRE_CUDA': 'cuda>=9.0', 'TRAINING_JOB_ARN': 'arn:aws:sagemaker:us-east-1:accountid:training-job/randomcutforest-2021-04-02-15-01-12-944', 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/v2/credentials/4bb51d00-f0ff-4039-9759-e85d930eaacf', 'CANONICAL_ENVROOT': '/opt/amazon', 'PYTHONUNBUFFERED': 'TRUE', 'NVIDIA_VISIBLE_DEVICES': 'void', 'LD_LIBRARY_PATH': '/usr/local/nvidia/lib64:/opt/amazon/lib', 'MXNET_KVSTORE_BIGARRAY_BOUND': '400000000', 'NVIDIA_DRIVER_CAPABILITIES': 'compute,utility', 'AWS_EXECUTION_ENV': 'AWS_ECS_EC2', 'PATH': '/opt/amazon/bin:/usr/local/nvidia/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/amazon/bin:/opt/amazon/bin', 'PWD': '/', 'LANG': 'en_US.utf8', 'SAGEMAKER_METRICS_DIRECTORY': '/opt/ml/output/metrics/sagemaker', 'AWS_REGION': 'us-east-1', 'HOME': '/root', 'SHLVL': '1', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION_VERSION': '2', 'OMP_NUM_THREADS': '2', 'ECS_CONTAINER_METADATA_URI': 'http://169.254.170.2/v3/75afdd20-c88b-4475-9b54-b68e83f605be', 'DMLC_INTERFACE': 'eth0', 'ECS_CONTAINER_METADATA_URI_V4': 'http://169.254.170.2/v4/75afdd20-c88b-4475-9b54-b68e83f605be', 'SAGEMAKER_HTTP_PORT': '8080', 'SAGEMAKER_DATA_PATH': '/opt/ml'}
[04/02/2021 15:04:15 INFO 139785830434176] envs={'ENVROOT': '/opt/amazon', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION': 'cpp', 'HOSTNAME': 'ip-100-69-50-156.ec2.internal', 'TRAINING_JOB_NAME': 'randomcutforest-2021-04-02-15-01-12-944', 'NVIDIA_REQUIRE_CUDA': 'cuda>=9.0', 'TRAINING_JOB_ARN': 'arn:aws:sagemaker:us-east-1:accountid:training-job/randomcutforest-2021-04-02-15-01-12-944', 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/v2/credentials/4bb51d00-f0ff-4039-9759-e85d930eaacf', 'CANONICAL_ENVROOT': '/opt/amazon', 'PYTHONUNBUFFERED': 'TRUE', 'NVIDIA_VISIBLE_DEVICES': 'void', 'LD_LIBRARY_PATH': '/usr/local/nvidia/lib64:/opt/amazon/lib', 'MXNET_KVSTORE_BIGARRAY_BOUND': '400000000', 'NVIDIA_DRIVER_CAPABILITIES': 'compute,utility', 'AWS_EXECUTION_ENV': 'AWS_ECS_EC2', 'PATH': '/opt/amazon/bin:/usr/local/nvidia/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/amazon/bin:/opt/amazon/bin', 'PWD': '/', 'LANG': 'en_US.utf8', 'SAGEMAKER_METRICS_DIRECTORY': '/opt/ml/output/metrics/sagemaker', 'AWS_REGION': 'us-east-1', 'HOME': '/root', 'SHLVL': '1', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION_VERSION': '2', 'OMP_NUM_THREADS': '2', 'ECS_CONTAINER_METADATA_URI': 'http://169.254.170.2/v3/75afdd20-c88b-4475-9b54-b68e83f605be', 'DMLC_INTERFACE': 'eth0', 'ECS_CONTAINER_METADATA_URI_V4': 'http://169.254.170.2/v4/75afdd20-c88b-4475-9b54-b68e83f605be', 'SAGEMAKER_HTTP_PORT': '8080', 'SAGEMAKER_DATA_PATH': '/opt/ml', 'DMLC_ROLE': 'scheduler', 'DMLC_PS_ROOT_URI': '100.69.50.156', 'DMLC_PS_ROOT_PORT': '9000', 'DMLC_NUM_SERVER': '1', 'DMLC_NUM_WORKER': '1'}
[04/02/2021 15:04:15 INFO 139785830434176] Launching parameter server for role server
[04/02/2021 15:04:15 INFO 139785830434176] {'ENVROOT': '/opt/amazon', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION': 'cpp', 'HOSTNAME': 'ip-100-69-50-156.ec2.internal', 'TRAINING_JOB_NAME': 'randomcutforest-2021-04-02-15-01-12-944', 'NVIDIA_REQUIRE_CUDA': 'cuda>=9.0', 'TRAINING_JOB_ARN': 'arn:aws:sagemaker:us-east-1:accountid:training-job/randomcutforest-2021-04-02-15-01-12-944', 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/v2/credentials/4bb51d00-f0ff-4039-9759-e85d930eaacf', 'CANONICAL_ENVROOT': '/opt/amazon', 'PYTHONUNBUFFERED': 'TRUE', 'NVIDIA_VISIBLE_DEVICES': 'void', 'LD_LIBRARY_PATH': '/usr/local/nvidia/lib64:/opt/amazon/lib', 'MXNET_KVSTORE_BIGARRAY_BOUND': '400000000', 'NVIDIA_DRIVER_CAPABILITIES': 'compute,utility', 'AWS_EXECUTION_ENV': 'AWS_ECS_EC2', 'PATH': '/opt/amazon/bin:/usr/local/nvidia/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/amazon/bin:/opt/amazon/bin', 'PWD': '/', 'LANG': 'en_US.utf8', 'SAGEMAKER_METRICS_DIRECTORY': '/opt/ml/output/metrics/sagemaker', 'AWS_REGION': 'us-east-1', 'HOME': '/root', 'SHLVL': '1', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION_VERSION': '2', 'OMP_NUM_THREADS': '2', 'ECS_CONTAINER_METADATA_URI': 'http://169.254.170.2/v3/75afdd20-c88b-4475-9b54-b68e83f605be', 'DMLC_INTERFACE': 'eth0', 'ECS_CONTAINER_METADATA_URI_V4': 'http://169.254.170.2/v4/75afdd20-c88b-4475-9b54-b68e83f605be', 'SAGEMAKER_HTTP_PORT': '8080', 'SAGEMAKER_DATA_PATH': '/opt/ml'}
[04/02/2021 15:04:15 INFO 139785830434176] envs={'ENVROOT': '/opt/amazon', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION': 'cpp', 'HOSTNAME': 'ip-100-69-50-156.ec2.internal', 'TRAINING_JOB_NAME': 'randomcutforest-2021-04-02-15-01-12-944', 'NVIDIA_REQUIRE_CUDA': 'cuda>=9.0', 'TRAINING_JOB_ARN': 'arn:aws:sagemaker:us-east-1:accountid:training-job/randomcutforest-2021-04-02-15-01-12-944', 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/v2/credentials/4bb51d00-f0ff-4039-9759-e85d930eaacf', 'CANONICAL_ENVROOT': '/opt/amazon', 'PYTHONUNBUFFERED': 'TRUE', 'NVIDIA_VISIBLE_DEVICES': 'void', 'LD_LIBRARY_PATH': '/usr/local/nvidia/lib64:/opt/amazon/lib', 'MXNET_KVSTORE_BIGARRAY_BOUND': '400000000', 'NVIDIA_DRIVER_CAPABILITIES': 'compute,utility', 'AWS_EXECUTION_ENV': 'AWS_ECS_EC2', 'PATH': '/opt/amazon/bin:/usr/local/nvidia/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/amazon/bin:/opt/amazon/bin', 'PWD': '/', 'LANG': 'en_US.utf8', 'SAGEMAKER_METRICS_DIRECTORY': '/opt/ml/output/metrics/sagemaker', 'AWS_REGION': 'us-east-1', 'HOME': '/root', 'SHLVL': '1', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION_VERSION': '2', 'OMP_NUM_THREADS': '2', 'ECS_CONTAINER_METADATA_URI': 'http://169.254.170.2/v3/75afdd20-c88b-4475-9b54-b68e83f605be', 'DMLC_INTERFACE': 'eth0', 'ECS_CONTAINER_METADATA_URI_V4': 'http://169.254.170.2/v4/75afdd20-c88b-4475-9b54-b68e83f605be', 'SAGEMAKER_HTTP_PORT': '8080', 'SAGEMAKER_DATA_PATH': '/opt/ml', 'DMLC_ROLE': 'server', 'DMLC_PS_ROOT_URI': '100.69.50.156', 'DMLC_PS_ROOT_PORT': '9000', 'DMLC_NUM_SERVER': '1', 'DMLC_NUM_WORKER': '1'}
[04/02/2021 15:04:15 INFO 139785830434176] Environment: {'ENVROOT': '/opt/amazon', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION': 'cpp', 'HOSTNAME': 'ip-100-69-50-156.ec2.internal', 'TRAINING_JOB_NAME': 'randomcutforest-2021-04-02-15-01-12-944', 'NVIDIA_REQUIRE_CUDA': 'cuda>=9.0', 'TRAINING_JOB_ARN': 'arn:aws:sagemaker:us-east-1:accountid:training-job/randomcutforest-2021-04-02-15-01-12-944', 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/v2/credentials/4bb51d00-f0ff-4039-9759-e85d930eaacf', 'CANONICAL_ENVROOT': '/opt/amazon', 'PYTHONUNBUFFERED': 'TRUE', 'NVIDIA_VISIBLE_DEVICES': 'void', 'LD_LIBRARY_PATH': '/usr/local/nvidia/lib64:/opt/amazon/lib', 'MXNET_KVSTORE_BIGARRAY_BOUND': '400000000', 'NVIDIA_DRIVER_CAPABILITIES': 'compute,utility', 'AWS_EXECUTION_ENV': 'AWS_ECS_EC2', 'PATH': '/opt/amazon/bin:/usr/local/nvidia/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/amazon/bin:/opt/amazon/bin', 'PWD': '/', 'LANG': 'en_US.utf8', 'SAGEMAKER_METRICS_DIRECTORY': '/opt/ml/output/metrics/sagemaker', 'AWS_REGION': 'us-east-1', 'HOME': '/root', 'SHLVL': '1', 'PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION_VERSION': '2', 'OMP_NUM_THREADS': '2', 'ECS_CONTAINER_METADATA_URI': 'http://169.254.170.2/v3/75afdd20-c88b-4475-9b54-b68e83f605be', 'DMLC_INTERFACE': 'eth0', 'ECS_CONTAINER_METADATA_URI_V4': 'http://169.254.170.2/v4/75afdd20-c88b-4475-9b54-b68e83f605be', 'SAGEMAKER_HTTP_PORT': '8080', 'SAGEMAKER_DATA_PATH': '/opt/ml', 'DMLC_ROLE': 'worker', 'DMLC_PS_ROOT_URI': '100.69.50.156', 'DMLC_PS_ROOT_PORT': '9000', 'DMLC_NUM_SERVER': '1', 'DMLC_NUM_WORKER': '1'}
Process 30 is a shell:scheduler.
Process 31 is a shell:server.
Process 1 is a worker.
[04/02/2021 15:04:15 INFO 139785830434176] Using default worker.
[04/02/2021 15:04:15 INFO 139785830434176] Loaded iterator creator application/x-recordio-protobuf for content type ('application/x-recordio-protobuf', '1.0')
[04/02/2021 15:04:15 INFO 139785830434176] Checkpoint loading and saving are disabled.
[04/02/2021 15:04:15 INFO 139785830434176] Verifying hyperparamemters...
[04/02/2021 15:04:15 INFO 139785830434176] Hyperparameters are correct.
[04/02/2021 15:04:15 INFO 139785830434176] Validating that feature_dim agrees with dimensions in training data...
[04/02/2021 15:04:15 INFO 139785830434176] feature_dim is correct.
[04/02/2021 15:04:15 INFO 139785830434176] Validating memory limits...
[04/02/2021 15:04:15 INFO 139785830434176] Available memory in bytes: 6291595264
[04/02/2021 15:04:15 INFO 139785830434176] Estimated sample size in bytes: 204800
[04/02/2021 15:04:15 INFO 139785830434176] Estimated memory needed to build the forest in bytes: 1024000
[04/02/2021 15:04:15 INFO 139785830434176] Memory limits validated.
[04/02/2021 15:04:15 INFO 139785830434176] Starting cluster sharing facilities...
[04/02/2021 15:04:15 INFO 139784458589952] concurrency model: async
[04/02/2021 15:04:15 INFO 139785830434176] Create Store: dist_async
[04/02/2021 15:04:15 INFO 139784458589952] masquerade (NAT) address: None
[04/02/2021 15:04:15 INFO 139784458589952] passive ports: None
[04/02/2021 15:04:15 INFO 139784458589952] >>> starting FTP server on 0.0.0.0:8999, pid=1 <<<
[04/02/2021 15:04:16 INFO 139785830434176] Cluster sharing facilities started.
[04/02/2021 15:04:16 INFO 139785830434176] Verifying all workers are accessible...
[04/02/2021 15:04:16 INFO 139785830434176] All workers accessible.
[04/02/2021 15:04:16 INFO 139785830434176] Initializing Sampler...
[04/02/2021 15:04:16 INFO 139785830434176] Sampler correctly initialized.
#metrics {"StartTime": 1617375855.9215763, "EndTime": 1617375856.602751, "Dimensions": {"Algorithm": "RandomCutForest", "Host": "algo-1", "Operation": "training"}, "Metrics": {"initialize.time": {"sum": 670.8333492279053, "count": 1, "min": 670.8333492279053, "max": 670.8333492279053}}}

#metrics {"StartTime": 1617375856.6029367, "EndTime": 1617375856.602991, "Dimensions": {"Algorithm": "RandomCutForest", "Host": "algo-1", "Operation": "training", "Meta": "init_train_data_iter"}, "Metrics": {"Total Records Seen": {"sum": 0.0, "count": 1, "min": 0, "max": 0}, "Total Batches Seen": {"sum": 0.0, "count": 1, "min": 0, "max": 0}, "Max Records Seen Between Resets": {"sum": 0.0, "count": 1, "min": 0, "max": 0}, "Max Batches Seen Between Resets": {"sum": 0.0, "count": 1, "min": 0, "max": 0}, "Reset Count": {"sum": 0.0, "count": 1, "min": 0, "max": 0}, "Number of Records Since Last Reset": {"sum": 0.0, "count": 1, "min": 0, "max": 0}, "Number of Batches Since Last Reset": {"sum": 0.0, "count": 1, "min": 0, "max": 0}}}

[2021-04-02 15:04:16.603] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 0, "duration": 681, "num_examples": 1, "num_bytes": 28000}
[04/02/2021 15:04:16 INFO 139785830434176] Sampling training data...
[2021-04-02 15:04:16.630] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 1, "duration": 27, "num_examples": 19, "num_bytes": 518420}
[04/02/2021 15:04:16 INFO 139785830434176] Sampling training data completed.
#metrics {"StartTime": 1617375856.602884, "EndTime": 1617375856.639572, "Dimensions": {"Algorithm": "RandomCutForest", "Host": "algo-1", "Operation": "training"}, "Metrics": {"epochs": {"sum": 1.0, "count": 1, "min": 1, "max": 1}, "update.time": {"sum": 36.15307807922363, "count": 1, "min": 36.15307807922363, "max": 36.15307807922363}}}

[04/02/2021 15:04:16 INFO 139785830434176] Early stop condition met. Stopping training.
[04/02/2021 15:04:16 INFO 139785830434176] #progress_metric: host=algo-1, completed 100 % epochs
#metrics {"StartTime": 1617375856.6033893, "EndTime": 1617375856.639939, "Dimensions": {"Algorithm": "RandomCutForest", "Host": "algo-1", "Operation": "training", "epoch": 0, "Meta": "training_data_iter"}, "Metrics": {"Total Records Seen": {"sum": 18515.0, "count": 1, "min": 18515, "max": 18515}, "Total Batches Seen": {"sum": 19.0, "count": 1, "min": 19, "max": 19}, "Max Records Seen Between Resets": {"sum": 18515.0, "count": 1, "min": 18515, "max": 18515}, "Max Batches Seen Between Resets": {"sum": 19.0, "count": 1, "min": 19, "max": 19}, "Reset Count": {"sum": 1.0, "count": 1, "min": 1, "max": 1}, "Number of Records Since Last Reset": {"sum": 18515.0, "count": 1, "min": 18515, "max": 18515}, "Number of Batches Since Last Reset": {"sum": 19.0, "count": 1, "min": 19, "max": 19}}}

[04/02/2021 15:04:16 INFO 139785830434176] #throughput_metric: host=algo-1, train throughput=504584.2769518661 records/second
[04/02/2021 15:04:16 INFO 139785830434176] Master node: building Random Cut Forest...
[04/02/2021 15:04:16 INFO 139785830434176] Gathering samples...
[04/02/2021 15:04:16 INFO 139785830434176] 18515 samples gathered
[04/02/2021 15:04:16 INFO 139785830434176] Building Random Cut Forest...
[04/02/2021 15:04:16 INFO 139785830434176] Random Cut Forest built:

ForestInfo{num_trees: 50, num_samples_in_forest: 18500, num_samples_per_tree: 370, sample_dim: 1, shingle_size: 1, trees_num_nodes: [9, 85, 197, 211, 145, 135, 205, 195, 151, 17, 209, 205, 115, 163, 257, 213, 37, 177, 199, 217, 23, 187, 211, 203, 23, 149, 187, 163, 27, 223, 221, 155, 85, 193, 231, 163, 167, 195, 195, 115, 161, 193, 205, 49, 155, 189, 175, 19, 161, 175, ], trees_depth: [4, 10, 16, 13, 14, 11, 15, 14, 13, 6, 12, 17, 11, 13, 15, 14, 8, 13, 15, 17, 7, 14, 16, 13, 7, 13, 14, 12, 8, 14, 15, 13, 15, 15, 15, 15, 13, 16, 19, 15, 12, 13, 14, 11, 11, 19, 15, 6, 11, 17, ], max_num_nodes: 257, min_num_nodes: 9, avg_num_nodes: 154, max_tree_depth: 19, min_tree_depth: 4, avg_tree_depth: 12, mem_size: 805408}
#metrics {"StartTime": 1617375856.639662, "EndTime": 1617375856.6464934, "Dimensions": {"Algorithm": "RandomCutForest", "Host": "algo-1", "Operation": "training"}, "Metrics": {"fit_model.time": {"sum": 3.577709197998047, "count": 1, "min": 3.577709197998047, "max": 3.577709197998047}, "model.bytes": {"sum": 805408.0, "count": 1, "min": 805408, "max": 805408}, "finalize.time": {"sum": 6.210803985595703, "count": 1, "min": 6.210803985595703, "max": 6.210803985595703}}}

[04/02/2021 15:04:16 INFO 139785830434176] Master node: Serializing the RandomCutForest model
#metrics {"StartTime": 1617375856.6465595, "EndTime": 1617375856.6555183, "Dimensions": {"Algorithm": "RandomCutForest", "Host": "algo-1", "Operation": "training"}, "Metrics": {"serialize_model.time": {"sum": 8.92496109008789, "count": 1, "min": 8.92496109008789, "max": 8.92496109008789}}}

[04/02/2021 15:04:16 INFO 139785830434176] Test data is not provided.
[04/02/2021 15:04:16 INFO 139784458589952] >>> shutting down FTP server, 0 socket(s), pid=1 <<<
#metrics {"StartTime": 1617375856.655592, "EndTime": 1617375856.738681, "Dimensions": {"Algorithm": "RandomCutForest", "Host": "algo-1", "Operation": "training"}, "Metrics": {"setuptime": {"sum": 21.520137786865234, "count": 1, "min": 21.520137786865234, "max": 21.520137786865234}, "totaltime": {"sum": 849.510908126831, "count": 1, "min": 849.510908126831, "max": 849.510908126831}}}


2021-04-02 15:04:28 Uploading - Uploading generated training model
2021-04-02 15:04:28 Completed - Training job completed
Training seconds: 68
Billable seconds: 68
('Training job name: {}'.format(estimator.latest_training_job.job_name))
'Training job name: randomcutforest-2021-04-02-15-01-12-944'

Deploying the model to an endpoint

Delete the existing deployed model before re-deploying

NOTE: Deleting and re-creating an endpoint will incur some downtime to the ML endpoint. There are alternatives to do a rolling deployment, blue/green deployments etc. I haven’t explored all that, but there are options out there.

Before we go ahead with deployment, we need to make sure that we delete any existing endpoints which have the same name. Otherwise, we will get an error that we are trying to deploy to an existing running endpoint.

ClientError: An error occurred (ValidationException) when calling the CreateEndpoint operation: Cannot create already existing endpoint "arn:aws:sagemaker:us-east-1:1372234455:endpoint/group-tradevolume".

Using boto3 we can list_endpoints() and pick the endpoint we want to delete. Or, if you already know the endpoint_name, you can pick that one out. I already know what the endpoint_name is, so I will use it.

client = boto3.client('sagemaker')
client.list_endpoints()
{'Endpoints': [{'EndpointName': 'forecast-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:accountid:endpoint/forecast-tradevolume',
   'CreationTime': datetime.datetime(2021, 3, 29, 22, 51, 32, 691000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 3, 29, 23, 1, 49, 455000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'},
  {'EndpointName': 'group-tradevolume',
   'EndpointArn': 'arn:aws:sagemaker:us-east-1:accountid:endpoint/group-tradevolume',
   'CreationTime': datetime.datetime(2021, 3, 24, 20, 33, 51, 834000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2021, 3, 24, 20, 42, 14, 734000, tzinfo=tzlocal()),
   'EndpointStatus': 'InService'}],
 'ResponseMetadata': {'RequestId': 'ec948ecb-5ef6-4b56-808b-6800647324c2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ec948ecb-5ef6-4b56-808b-6800647324c2',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '459',
   'date': 'Fri, 02 Apr 2021 15:20:20 GMT'},
  'RetryAttempts': 0}}
endpoint_name = 'group-tradevolume'
response = client.describe_endpoint_config(EndpointConfigName=endpoint_name)
response
{'EndpointConfigName': 'group-tradevolume',
 'EndpointConfigArn': 'arn:aws:sagemaker:us-east-1:accountid:endpoint-config/group-tradevolume',
 'ProductionVariants': [{'VariantName': 'AllTraffic',
   'ModelName': 'randomcutforest-2021-03-24-20-17-24-136',
   'InitialInstanceCount': 1,
   'InstanceType': 'ml.m4.xlarge',
   'InitialVariantWeight': 1.0}],
 'KmsKeyId': 'arn:aws:kms:us-east-1:accountid:key/f43c82fd-a0ff-4db7-bb5e-cd8ee4e8bdde',
 'CreationTime': datetime.datetime(2021, 3, 24, 20, 33, 51, 651000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': 'f06061c3-7732-4b20-a93b-4a15ba99e976',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f06061c3-7732-4b20-a93b-4a15ba99e976',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '450',
   'date': 'Fri, 02 Apr 2021 17:27:00 GMT'},
  'RetryAttempts': 0}}
# extract the model_name and endpoint_name from the response obj
model_name = response['ProductionVariants'][0]['ModelName']

print("Deleting model: {}".format(model_name))
# delete the model
client.delete_model(ModelName=model_name)

print("Deleting endpoint: {}".format(endpoint_name))
# delete the endpoint
client.delete_endpoint(EndpointName=endpoint_name)

print("Deleting endpoint config: {}".format(endpoint_name))
# delete the endpoint configuration
client.delete_endpoint_config(EndpointConfigName=endpoint_name)
Deleting model: randomcutforest-2021-03-24-20-17-24-136
Deleting endpoint: group-tradevolume
Deleting endpoint config: group-tradevolume





{'ResponseMetadata': {'RequestId': '52c122fa-bec7-44a1-924a-1886bed84cf0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '52c122fa-bec7-44a1-924a-1886bed84cf0',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Fri, 02 Apr 2021 17:35:44 GMT'},
  'RetryAttempts': 0}}

Deploy the model

rcf_predictor = estimator.deploy(initial_instance_count=1,
                                 kms_key=omniai.utils.sagemaker.kms_key,
                                 endpoint_name='group-tradevolume',
                                 instance_type='ml.t2.medium')
INFO:sagemaker:Creating model with name: randomcutforest-2021-04-02-15-01-12-944
WARNING:sagemaker:Using already existing model: randomcutforest-2021-04-02-15-01-12-944
INFO:sagemaker:Creating endpoint with name group-tradevolume
---------------------!
print('Endpoint name: {}'.format(rcf_predictor.endpoint))
Endpoint name: group-tradevolume

Inference

We start the inference process by first setting the serializer and deserializers. Next, we need to get the anomaly scores for each of the data points in our dataset. So, we need to send all of our training data back to the endpoint point to get the anomaly scores.

from sagemaker.predictor import csv_serializer, json_deserializer

rcf_predictor.content_type = 'text/csv'
rcf_predictor.serializer = csv_serializer
rcf_predictor.deserializer = json_deserializer

# send the entire training data to get anomaly scores
results = rcf_predictor.predict(trade_volume_array)
scores = [datum["score"] for datum in results["scores"]]

# make a copy
df_inference = df_train.copy()

# create a column
df_inference["score"] = pd.Series(scores, index=df_inference.index)
print(df_inference.shape)
df_inference.head()
volume time score
localtime
2020-12-31 18:30:00 0 1609457400000 0.705396
2020-12-31 18:35:00 0 1609457700000 0.705396
2020-12-31 18:40:00 2 1609458000000 0.808736
2020-12-31 18:45:00 0 1609458300000 0.705396
2020-12-31 18:50:00 6 1609458600000 0.922013
... ... ... ...
2021-04-01 10:35:00 79 1617287700000 1.606449
2021-04-01 10:40:00 45 1617288000000 1.574120
2021-04-01 10:45:00 52 1617288300000 1.516928
2021-04-01 10:50:00 48 1617288600000 1.545339
2021-04-01 10:55:00 37 1617288900000 1.547586

18515 rows × 3 columns

df_inference['score'].plot(figsize=(20, 10))

png

Define what you consider as an anomaly

The final step before we plot the anomalies is to determine a cut-off/threshold for calling a data point anomalous. Typically it is 3 standard deviations away from the mean score. However, it is very specific to the problem, since you may want to be alerted only when there are large spikes and not for each and every spike that you encounter. So, it depends on what you want. I have defined the cut-off to be 6 std deviations away.

df_inference['score'].mean() + 6 * df_inference['score'].std()
3.923742929224698
score_mean = df_inference["score"].mean()
score_std = df_inference["score"].std()
score_cutoff = score_mean + 6 * score_std

anomalies = df_inference[df_inference["score"] > score_cutoff]
anomalies.drop('scores', axis=1)
volume time score
localtime
2021-01-05 15:45:00 184 1609879500000 4.129643
2021-01-13 17:10:00 288 1610575800000 5.740673
2021-01-14 11:50:00 184 1610643000000 4.129643
2021-01-14 12:00:00 185 1610643600000 4.062676
2021-01-19 15:20:00 182 1611087600000 4.008771
2021-01-20 16:10:00 179 1611177000000 3.944594
2021-01-25 09:50:00 208 1611586200000 4.579094
2021-01-26 09:20:00 188 1611670800000 4.123510
2021-01-26 13:30:00 185 1611685800000 4.062676
2021-01-26 17:30:00 182 1611700200000 4.008771
2021-01-27 15:50:00 179 1611780600000 3.944594
2021-02-01 15:35:00 387 1612211700000 6.519725
2021-02-02 16:10:00 184 1612300200000 4.129643
2021-02-03 09:20:00 295 1612362000000 5.791185
2021-02-03 10:35:00 179 1612366500000 3.944594
2021-02-03 11:45:00 179 1612370700000 3.944594
2021-02-03 12:10:00 179 1612372200000 3.944594
2021-02-03 15:40:00 198 1612384800000 4.359764
2021-02-04 08:55:00 365 1612446900000 6.385341
2021-02-05 11:00:00 225 1612540800000 4.882610
2021-02-10 15:55:00 177 1612990500000 3.969344
2021-02-25 11:25:00 203 1614270300000 4.547264
2021-03-03 15:55:00 231 1614804900000 4.996446
2021-03-05 16:10:00 198 1614978600000 4.359764
2021-03-08 10:25:00 190 1615217100000 4.207701
2021-03-12 12:20:00 265 1615569600000 5.544784
2021-03-15 15:25:00 183 1615836300000 4.043871
2021-03-16 15:05:00 187 1615921500000 4.111437
2021-03-16 18:15:00 190 1615932900000 4.207701
2021-03-22 11:50:00 195 1616428200000 4.289662
2021-03-22 15:20:00 242 1616440800000 5.151677
2021-03-24 11:00:00 177 1616598000000 3.969344
2021-03-29 17:25:00 208 1617053100000 4.579094
2021-03-31 16:10:00 207 1617221400000 4.541122
2021-03-31 16:20:00 229 1617222000000 4.939764

Plot anomalies

fig, ax1 = plt.subplots(figsize=(20,10))
ax2 = ax1.twinx()

ax1.plot(df_inference["volume"], color="C0", alpha=0.8)
#ax2.plot(df_inference["score"], color="C1")
ax2.plot(anomalies.index, anomalies.score, "ko", color='C1')

ax1.grid(which="major", axis="both")

ax1.set_ylabel("Trading Volume per 5 mins", color="C0", alpha=0.9)
ax2.set_ylabel("Anomaly Score", color="C1")

ax1.tick_params("y", colors="C0")
ax2.tick_params("y", colors="C1")

ax1.set_ylim(0, max(df_inference['volume']))
ax2.set_ylim(min(scores), 1.4 * max(scores))
(0.7053962559, 9.12761522288)

png