Create AWS Redshift cluster using AWS python SDK

18 minute read

Creating Redshift Cluster using the AWS python SDK

Author: Shravan Kuchkula

Introduction

In this post, I will show how we can use AWS python SDK boto3 to create AWS resources (as opposed to creating from console by clicking through many screens) and launch an AWS Redshift Cluster. Later, we will look into reading data from S3 and storing them into Redshift cluster tables and perform some queries. Finally, we will programatically destroy all the resources that we created so that we don’t incur any unnecessary costs.

Many topics are covered in this notebook, below is a list of questions that will be answered:

Part 1: Launch an AWS Redshift cluster

  • How to create IAM account and obtain the AWS_KEY and AWS_SECRET?
  • How to connect to AWS console programatically using the above IAM user?
  • How to use configparser to read in a DWH config file with all the variables?
  • How to use AWS python SDK using boto3 package?
  • How to create S3, EC2, IAM and Redshift clients using boto3?
  • How to create IAM role and assign a policy?
  • How to create and launch a RedShift cluster?
  • How to monitor a Redshift cluster state?
  • How to create a security group for RedShift cluster to access S3?

Part 2: Connect to cluster, load data and perform analysis

  • How to connect to Redshift cluster from Jupyter Notebook?
  • How to load a partitioned and non-partitioned data from a S3 bucket into a table in Redshift?
  • How to create tables with no-distribution strategy?
  • How to create tables with a distribution strategy?
  • Compare the performance of nodist vs dist strategy.

Part 3: Destroy the resources

  • How to delete a Redshift cluster?
  • How to delete IAM role, security groups and other resources that we created in part1?

About the role of Redshift in the overall data engineering pipeline

AWS Redshift is a cloud-managed, columnar, Massively Parallel Processing (MPP) data warehouse service in the cloud.

Shown below is the role of AWS Redshift in the big picture of a typical data engineering pipeline. The sources can be a diverse mix of databases (Cassandra, DynamoDB, MySQL, Postgres), EC2 machines, other S3 locations. Then, there is this class of products which are responsible for doing the actual ETL calls. The goal of this suite of products is to issue calls to the sources, extract the data from them, do some transformations and then store the transformed data into S3 - which is our defacto staging area. Some products in this category, which are commonly referred to as dataflow products are: Airflow, Luigi, Nifi, StreamSet or AWS Data pipeline.

In this post, my focus is on the highlighted box, where we are assuming there are some csv files already loaded onto the S3 staging area by some ETL process. Our goal is to ingest this csv data from S3 into Amazon Redshift cluster.

Once the data is in Redshift, our chief goal is for the BI apps to be able to connect to Redshift cluster and do some analysis. The BI apps can either directly connect to the Redshift cluster or go through an intermediate stage where data is in the form of aggregations represented by OLAP cubes.

Here, I will be making use of this jupyter notebook to make a direct connection to the Redshift cluster and create tables, load data from S3 into Redshift cluster, and perform some analysis. In the real-world, there could be full-fledged BI products which can feed from the Redshift cluster and do some sophisticated analysis.

Finally, I will illustrate the power of Infrastructure-as-code, through which we can provision, use and destroy AWS resources very rapidly, and in a reproducible manner.

Part 1: Launch an AWS Redshift Cluster

Step 1: Create an IAM user

In most cases, people are given an IAM user to work with. If you are doing this from scratch, then go into your AWS console and create a new IAM user.

  • Create a new IAM user in your AWS account
  • Give it AdministratorAccess, From Attach existing policies directly Tab
  • Take note of the access key and secret
  • Edit the file dwh.cfg in the same folder as this notebook and fill
    [AWS]
    KEY= YOUR_AWS_KEY
    SECRET= YOUR_AWS_SECRET
    
import pandas as pd
import boto3
import json

Step 2: Use configparser to read in the variables

In the dwh.cfg file, you will store your secrets and configuration files. This should not be checked into git.

The file looks like this:

[AWS]
KEY=<your-key>
SECRET=<your-secret>

[DWH]
DWH_CLUSTER_TYPE=multi-node
DWH_NUM_NODES=4
DWH_NODE_TYPE=dc2.large

DWH_IAM_ROLE_NAME=dwhRole
DWH_CLUSTER_IDENTIFIER=dwhCluster
DWH_DB=dwh
DWH_DB_USER=dwhuser
DWH_DB_PASSWORD=<password>
DWH_PORT=5439
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

#(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", \
                   "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", \
                   "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER,\
                   DWH_DB, DWH_DB_USER, "XXX", DWH_PORT, DWH_IAM_ROLE_NAME]
             })
Param Value
0 DWH_CLUSTER_TYPE multi-node
1 DWH_NUM_NODES 4
2 DWH_NODE_TYPE dc2.large
3 DWH_CLUSTER_IDENTIFIER dwhCluster
4 DWH_DB dwh
5 DWH_DB_USER dwhuser
6 DWH_DB_PASSWORD XXX
7 DWH_PORT 5439
8 DWH_IAM_ROLE_NAME dwhRole

Step 3: Create clients for EC2, S3, IAM, and Redshift

Here, we will be using boto3 to get the resource handles to talk to S3, EC2, IAM and Redshift. Notice that for Redshift, you will be creating a client. I am choosing us-east-1 as my region.

import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-east-1'
                  )

redshift = boto3.client('redshift',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

Step 4: Create an IAM role and assign it a policy so that it can read S3 bucket

  • Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)
from botocore.exceptions import ClientError

#1.1 Create the role,
try:
    print("1.1 Creating a new IAM Role")
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)


print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)
1.1 Creating a new IAM Role
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::506140549518:role/dwhRole

Step 5: Create a Redshift Cluster and Launch it

  • Create a RedShift Cluster
  • For complete arguments to create_cluster, see docs
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,

        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

Step 6: Describe the cluster to see its status

  • Run this block several times until the cluster status becomes Available
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", \
                  "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)
Key Value
0 ClusterIdentifier dwhcluster
1 NodeType dc2.large
2 ClusterStatus available
3 MasterUsername dwhuser
4 DBName dwh
5 Endpoint {'Address': 'dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com', 'Port': 5439}
6 VpcId vpc-ade05fd7
7 NumberOfNodes 4

Note: Make a note of Cluster Endpoint and Role ARN

##
## DO NOT RUN THIS unless the cluster status becomes "Available"
##
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
#print("DWH_ENDPOINT :: ", endpoint)
print("DWN_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", roleArn)
DWN_ENDPOINT ::  dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::506140549518:role/dwhRole

Step 7: Create Security group inbound rule

# Open an incoming  TCP port to access the cluster ednpoint
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)

    defaultSg.authorize_ingress(
        GroupName= 'default',  
        CidrIp='0.0.0.0/0',  
        IpProtocol='TCP',  
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)
ec2.SecurityGroup(id='sg-0a945e1c987ca77dd')

Part 2: Connect to cluster, load data and perform analysis

In the second part, we will be making use of the cluster we just launched in part 1.

Using sql extension, we can directly run SQL commands within jupyter notebook.

  • A single %sql means the query is a python string accessed using the dollar sign.
  • A %%sql means the query is not a python string but can be multiline SQL statements

Step 1: Load sql extension and connect to the cluster

%load_ext sql
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string
postgresql://dwhuser:Passw0rd@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh





'Connected: dwhuser@dwh'

Step 2: Get some data from S3

I will be using the udacity-labs S3 bucket which is in us-west-2 region to retrieve the tickets data. Notice, that we have two version of this dataset available: one is partitioned into parts, second is full.csv.gz.

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("udacity-labs")

for obj in sampleDbBucket.objects.filter(Prefix="tickets"):
    print(obj)
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/full/')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/full/full.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00000-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00001-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00002-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00003-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00004-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00005-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00006-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00007-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00008-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00009-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')

Step 3: Create tables for partitioned data

%%sql
DROP TABLE IF EXISTS "sporting_event_ticket";
CREATE TABLE "sporting_event_ticket" (
    "id" double precision DEFAULT nextval('sporting_event_ticket_seq') NOT NULL,
    "sporting_event_id" double precision NOT NULL,
    "sport_location_id" double precision NOT NULL,
    "seat_level" numeric(1,0) NOT NULL,
    "seat_section" character varying(15) NOT NULL,
    "seat_row" character varying(10) NOT NULL,
    "seat" character varying(10) NOT NULL,
    "ticketholder_id" double precision,
    "ticket_price" numeric(8,2) NOT NULL
);
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.





[]

Step 4: Load partitioned data from S3 into the table just created

When moving large amounts of data from S3 staging area to Redshift, it is better to use the copy command instead of insert. The benefit of using the copy command is that the ingestion can be parallelized if the data is broken into parts. Each part can be independently ingested by a slice in the cluster. As we are creating 4 nodes in this cluster, we can assume that there will be 4 parallel ingestions into the cluster. This can significantly reduce the time it takes to ingest large payloads.

%%time
qry = """
    copy sporting_event_ticket from 's3://udacity-labs/tickets/split/part'
    credentials 'aws_iam_role={}'
    gzip delimiter ';' compupdate off region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 3.82 ms, sys: 2.51 ms, total: 6.34 ms
Wall time: 12.6 s

Step 5: Create tables for non-partioned data

To hammer the point home, that if we have non-partitioned data, which cannot be ingested in parallel, it takes more time, even when you are using copy. Since copy command can only parallelize a job, if the data is partitioned.

Notice the wall time is almost double.

%%sql
DROP TABLE IF EXISTS "sporting_event_ticket_full";
CREATE TABLE "sporting_event_ticket_full" (
    "id" double precision DEFAULT nextval('sporting_event_ticket_seq') NOT NULL,
    "sporting_event_id" double precision NOT NULL,
    "sport_location_id" double precision NOT NULL,
    "seat_level" numeric(1,0) NOT NULL,
    "seat_section" character varying(15) NOT NULL,
    "seat_row" character varying(10) NOT NULL,
    "seat" character varying(10) NOT NULL,
    "ticketholder_id" double precision,
    "ticket_price" numeric(8,2) NOT NULL
);
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.





[]
%%time

qry = """
    copy sporting_event_ticket_full from 's3://udacity-labs/tickets/full/full.csv.gz'
    credentials 'aws_iam_role={}'
    gzip delimiter ';' compupdate off region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.1 ms, sys: 2.59 ms, total: 6.69 ms
Wall time: 24.6 s

Step 6: Create tables with no distribution strategy

What is the need for distribution strategy?: When a table is partitioned up into many pieces and distributed across slices in different machines, this partitioning is done blindly, that is, we don’t make use of prior knowledge of the frequent access patterns of a table. If one has an idea about the frequent access pattern of a table, one can choose a more clever strategy. There are two possible strategies:

  • Distribution Style
  • Sorting Key

We will use the lineorder dataset which is common for benchmarking start schemas in data warehouses. The data is pre-loaded in a public bucket on the us-west-2 region. lineorder

%%sql
CREATE SCHEMA IF NOT EXISTS nodist;
SET search_path TO nodist;

DROP TABLE IF EXISTS part cascade;
DROP TABLE IF EXISTS supplier;
DROP TABLE IF EXISTS supplier;
DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS dwdate;
DROP TABLE IF EXISTS lineorder;

CREATE TABLE part
(
  p_partkey     INTEGER NOT NULL,
  p_name        VARCHAR(22) NOT NULL,
  p_mfgr        VARCHAR(6) NOT NULL,
  p_category    VARCHAR(7) NOT NULL,
  p_brand1      VARCHAR(9) NOT NULL,
  p_color       VARCHAR(11) NOT NULL,
  p_type        VARCHAR(25) NOT NULL,
  p_size        INTEGER NOT NULL,
  p_container   VARCHAR(10) NOT NULL
);

CREATE TABLE supplier
(
  s_suppkey   INTEGER NOT NULL,
  s_name      VARCHAR(25) NOT NULL,
  s_address   VARCHAR(25) NOT NULL,
  s_city      VARCHAR(10) NOT NULL,
  s_nation    VARCHAR(15) NOT NULL,
  s_region    VARCHAR(12) NOT NULL,
  s_phone     VARCHAR(15) NOT NULL
);

CREATE TABLE customer
(
  c_custkey      INTEGER NOT NULL,
  c_name         VARCHAR(25) NOT NULL,
  c_address      VARCHAR(25) NOT NULL,
  c_city         VARCHAR(10) NOT NULL,
  c_nation       VARCHAR(15) NOT NULL,
  c_region       VARCHAR(12) NOT NULL,
  c_phone        VARCHAR(15) NOT NULL,
  c_mktsegment   VARCHAR(10) NOT NULL
);

CREATE TABLE dwdate
(
  d_datekey            INTEGER NOT NULL,
  d_date               VARCHAR(19) NOT NULL,
  d_dayofweek          VARCHAR(10) NOT NULL,
  d_month              VARCHAR(10) NOT NULL,
  d_year               INTEGER NOT NULL,
  d_yearmonthnum       INTEGER NOT NULL,
  d_yearmonth          VARCHAR(8) NOT NULL,
  d_daynuminweek       INTEGER NOT NULL,
  d_daynuminmonth      INTEGER NOT NULL,
  d_daynuminyear       INTEGER NOT NULL,
  d_monthnuminyear     INTEGER NOT NULL,
  d_weeknuminyear      INTEGER NOT NULL,
  d_sellingseason      VARCHAR(13) NOT NULL,
  d_lastdayinweekfl    VARCHAR(1) NOT NULL,
  d_lastdayinmonthfl   VARCHAR(1) NOT NULL,
  d_holidayfl          VARCHAR(1) NOT NULL,
  d_weekdayfl          VARCHAR(1) NOT NULL
);
CREATE TABLE lineorder
(
  lo_orderkey          INTEGER NOT NULL,
  lo_linenumber        INTEGER NOT NULL,
  lo_custkey           INTEGER NOT NULL,
  lo_partkey           INTEGER NOT NULL,
  lo_suppkey           INTEGER NOT NULL,
  lo_orderdate         INTEGER NOT NULL,
  lo_orderpriority     VARCHAR(15) NOT NULL,
  lo_shippriority      VARCHAR(1) NOT NULL,
  lo_quantity          INTEGER NOT NULL,
  lo_extendedprice     INTEGER NOT NULL,
  lo_ordertotalprice   INTEGER NOT NULL,
  lo_discount          INTEGER NOT NULL,
  lo_revenue           INTEGER NOT NULL,
  lo_supplycost        INTEGER NOT NULL,
  lo_tax               INTEGER NOT NULL,
  lo_commitdate        INTEGER NOT NULL,
  lo_shipmode          VARCHAR(10) NOT NULL
);
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.





[]

Step 7: Create tables with distribution strategy

%%sql

CREATE SCHEMA IF NOT EXISTS dist;
SET search_path TO dist;

DROP TABLE IF EXISTS part cascade;
DROP TABLE IF EXISTS supplier;
DROP TABLE IF EXISTS supplier;
DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS dwdate;
DROP TABLE IF EXISTS lineorder;

CREATE TABLE part (
  p_partkey     	integer     	not null	sortkey distkey,
  p_name        	varchar(22) 	not null,
  p_mfgr        	varchar(6)      not null,
  p_category    	varchar(7)      not null,
  p_brand1      	varchar(9)      not null,
  p_color       	varchar(11) 	not null,
  p_type        	varchar(25) 	not null,
  p_size        	integer     	not null,
  p_container   	varchar(10)     not null
);

CREATE TABLE supplier (
  s_suppkey     	integer        not null sortkey,
  s_name        	varchar(25)    not null,
  s_address     	varchar(25)    not null,
  s_city        	varchar(10)    not null,
  s_nation      	varchar(15)    not null,
  s_region      	varchar(12)    not null,
  s_phone       	varchar(15)    not null)
diststyle all;

CREATE TABLE customer (
  c_custkey     	integer        not null sortkey,
  c_name        	varchar(25)    not null,
  c_address     	varchar(25)    not null,
  c_city        	varchar(10)    not null,
  c_nation      	varchar(15)    not null,
  c_region      	varchar(12)    not null,
  c_phone       	varchar(15)    not null,
  c_mktsegment      varchar(10)    not null)
diststyle all;

CREATE TABLE dwdate (
  d_datekey            integer       not null sortkey,
  d_date               varchar(19)   not null,
  d_dayofweek	      varchar(10)   not null,
  d_month      	    varchar(10)   not null,
  d_year               integer       not null,
  d_yearmonthnum       integer  	 not null,
  d_yearmonth          varchar(8)	not null,
  d_daynuminweek       integer       not null,
  d_daynuminmonth      integer       not null,
  d_daynuminyear       integer       not null,
  d_monthnuminyear     integer       not null,
  d_weeknuminyear      integer       not null,
  d_sellingseason      varchar(13)    not null,
  d_lastdayinweekfl    varchar(1)    not null,
  d_lastdayinmonthfl   varchar(1)    not null,
  d_holidayfl          varchar(1)    not null,
  d_weekdayfl          varchar(1)    not null)
diststyle all;

CREATE TABLE lineorder (
  lo_orderkey      	    integer     	not null,
  lo_linenumber        	integer     	not null,
  lo_custkey           	integer     	not null,
  lo_partkey           	integer     	not null distkey,
  lo_suppkey           	integer     	not null,
  lo_orderdate         	integer     	not null sortkey,
  lo_orderpriority     	varchar(15)     not null,
  lo_shippriority      	varchar(1)      not null,
  lo_quantity          	integer     	not null,
  lo_extendedprice     	integer     	not null,
  lo_ordertotalprice   	integer     	not null,
  lo_discount          	integer     	not null,
  lo_revenue           	integer     	not null,
  lo_supplycost        	integer     	not null,
  lo_tax               	integer     	not null,
  lo_commitdate         integer         not null,
  lo_shipmode          	varchar(10)     not null
);
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.





[]

Step 8: Load the data from S3 into dist and no-dist tables we created above

Our intent here is to run 5 COPY operations for the 5 tables respectively as show below.

However, we want to do accomplish the following:

  • Make sure that the DWH_ROLE_ARN is substituted with the correct value in each query
  • Perform the data loading twice once for each schema (dist and nodist)
  • Collect timing statistics to compare the insertion times Thus, we have scripted the insertion as found below in the function loadTables which returns a pandas dataframe containing timing statistics for the copy operations
copy customer from 's3://awssampledbuswest2/ssbgz/customer'
credentials 'aws_iam_role=<DWH_ROLE_ARN>'
gzip region 'us-west-2';

copy dwdate from 's3://awssampledbuswest2/ssbgz/dwdate'
credentials 'aws_iam_role=<DWH_ROLE_ARN>'
gzip region 'us-west-2';

copy lineorder from 's3://awssampledbuswest2/ssbgz/lineorder'
credentials 'aws_iam_role=<DWH_ROLE_ARN>'
gzip region 'us-west-2';

copy part from 's3://awssampledbuswest2/ssbgz/part'
credentials 'aws_iam_role=<DWH_ROLE_ARN>'
gzip region 'us-west-2';

copy supplier from 's3://awssampledbuswest2/ssbgz/supplier'
credentials 'aws_iam_role=<DWH_ROLE_ARN>'
gzip region 'us-west-2';
import os
from time import time
def loadTables(schema, tables):
    loadTimes = []
    SQL_SET_SCEMA = "SET search_path TO {};".format(schema)
    %sql $SQL_SET_SCEMA

    for table in tables:
        SQL_COPY = """
copy {} from 's3://awssampledbuswest2/ssbgz/{}'
credentials 'aws_iam_role={}'
gzip region 'us-west-2';
        """.format(table,table, DWH_ROLE_ARN)

        print("======= LOADING TABLE: ** {} ** IN SCHEMA ==> {} =======".format(table, schema))
        print(SQL_COPY)

        t0 = time()
        %sql $SQL_COPY
        loadTime = time()-t0
        loadTimes.append(loadTime)

        print("=== DONE IN: {0:.2f} sec\n".format(loadTime))
    return pd.DataFrame({"table":tables, "loadtime_"+schema:loadTimes}).set_index('table')
#-- List of the tables to be loaded
tables = ["customer","dwdate","supplier", "part", "lineorder"]

#-- Insertion twice for each schema (WARNING!! EACH CAN TAKE MORE THAN 10 MINUTES!!!)
nodistStats = loadTables("nodist", tables)
distStats = loadTables("dist", tables)
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
======= LOADING TABLE: ** customer ** IN SCHEMA ==> nodist =======

copy customer from 's3://awssampledbuswest2/ssbgz/customer'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 27.38 sec

======= LOADING TABLE: ** dwdate ** IN SCHEMA ==> nodist =======

copy dwdate from 's3://awssampledbuswest2/ssbgz/dwdate'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 14.44 sec

======= LOADING TABLE: ** supplier ** IN SCHEMA ==> nodist =======

copy supplier from 's3://awssampledbuswest2/ssbgz/supplier'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 21.40 sec

======= LOADING TABLE: ** part ** IN SCHEMA ==> nodist =======

copy part from 's3://awssampledbuswest2/ssbgz/part'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 17.51 sec

======= LOADING TABLE: ** lineorder ** IN SCHEMA ==> nodist =======

copy lineorder from 's3://awssampledbuswest2/ssbgz/lineorder'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 558.49 sec

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
======= LOADING TABLE: ** customer ** IN SCHEMA ==> dist =======

copy customer from 's3://awssampledbuswest2/ssbgz/customer'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 27.10 sec

======= LOADING TABLE: ** dwdate ** IN SCHEMA ==> dist =======

copy dwdate from 's3://awssampledbuswest2/ssbgz/dwdate'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 7.93 sec

======= LOADING TABLE: ** supplier ** IN SCHEMA ==> dist =======

copy supplier from 's3://awssampledbuswest2/ssbgz/supplier'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 18.90 sec

======= LOADING TABLE: ** part ** IN SCHEMA ==> dist =======

copy part from 's3://awssampledbuswest2/ssbgz/part'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 16.38 sec

======= LOADING TABLE: ** lineorder ** IN SCHEMA ==> dist =======

copy lineorder from 's3://awssampledbuswest2/ssbgz/lineorder'
credentials 'aws_iam_role=arn:aws:iam::506140549518:role/dwhRole'
gzip region 'us-west-2';

 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 712.86 sec

Step 9: Compare load times with and without dist strategy

import matplotlib.pyplot as plt
import pandas as pd
#-- Plotting of the timing results
stats = distStats.join(nodistStats)
stats.plot.bar()
plt.show()

png

Step 10: Compare query performance with and without dist strategy

oneDim_SQL ="""
set enable_result_cache_for_session to off;
SET search_path TO {};

select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, dwdate
where lo_orderdate = d_datekey
and d_year = 1997
and lo_discount between 1 and 3
and lo_quantity < 24;
"""

twoDim_SQL="""
set enable_result_cache_for_session to off;
SET search_path TO {};

select sum(lo_revenue), d_year, p_brand1
from lineorder, dwdate, part, supplier
where lo_orderdate = d_datekey
and lo_partkey = p_partkey
and lo_suppkey = s_suppkey
and p_category = 'MFGR#12'
and s_region = 'AMERICA'
group by d_year, p_brand1
"""

drill_SQL = """
set enable_result_cache_for_session to off;
SET search_path TO {};

select c_city, s_city, d_year, sum(lo_revenue) as revenue
from customer, lineorder, supplier, dwdate
where lo_custkey = c_custkey
and lo_suppkey = s_suppkey
and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or
c_city='UNITED KI5')
and (s_city='UNITED KI1' or
s_city='UNITED KI5')
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc;
"""


oneDimSameDist_SQL ="""
set enable_result_cache_for_session to off;
SET search_path TO {};

select lo_orderdate, sum(lo_extendedprice*lo_discount) as revenue  
from lineorder, part
where lo_partkey  = p_partkey
group by lo_orderdate
order by lo_orderdate
"""

def compareQueryTimes(schema):
    queryTimes  =[]
    for i,query in enumerate([oneDim_SQL, twoDim_SQL, drill_SQL, oneDimSameDist_SQL]):
        t0 = time()
        q = query.format(schema)
        %sql $q
        queryTime = time()-t0
        queryTimes.append(queryTime)
    return pd.DataFrame({"query":["oneDim","twoDim", "drill", "oneDimSameDist"], "queryTime_"+schema:queryTimes}).set_index('query')
noDistQueryTimes = compareQueryTimes("nodist")
distQueryTimes   = compareQueryTimes("dist")
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
1 rows affected.
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
280 rows affected.
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
4 rows affected.
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
2406 rows affected.
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
1 rows affected.
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
280 rows affected.
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
4 rows affected.
 * postgresql://dwhuser:***@dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
2406 rows affected.
queryTimeDF =noDistQueryTimes.join(distQueryTimes)
queryTimeDF.plot.bar()
plt.show()

png

improvementDF = queryTimeDF["distImprovement"] =100.0*(queryTimeDF['queryTime_nodist']-queryTimeDF['queryTime_dist'])/queryTimeDF['queryTime_nodist']
improvementDF.plot.bar(title="% dist Improvement by query")
plt.show()

png

Part 3: Clean up resources

DO NOT RUN THIS UNLESS YOU ARE SURE YOU WANT TO DELETE THE CLUSTER

#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!
{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2019, 8, 30, 20, 33, 18, 333000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-1df7ce58',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-ade05fd7',
  'AvailabilityZone': 'us-east-1f',
  'PreferredMaintenanceWindow': 'sat:07:30-sat:08:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': False,
  'IamRoles': [{'IamRoleArn': 'arn:aws:iam::506140549518:role/dwhRole',
    'ApplyStatus': 'in-sync'}],
  'MaintenanceTrackName': 'current',
  'DeferredMaintenanceWindows': []},
 'ResponseMetadata': {'RequestId': 'a5cf0ad6-cb67-11e9-994b-1737f36bbeac',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a5cf0ad6-cb67-11e9-994b-1737f36bbeac',
   'content-type': 'text/xml',
   'content-length': '2290',
   'vary': 'Accept-Encoding',
   'date': 'Fri, 30 Aug 2019 20:49:22 GMT'},
  'RetryAttempts': 0}}
  • run this block several times until the cluster really deleted
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)
Key Value
0 ClusterIdentifier dwhcluster
1 NodeType dc2.large
2 ClusterStatus deleting
3 MasterUsername dwhuser
4 DBName dwh
5 Endpoint {'Address': 'dwhcluster.cpczrz48gy51.us-east-1.redshift.amazonaws.com', 'Port': 5439}
6 VpcId vpc-ade05fd7
7 NumberOfNodes 4
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!
{'ResponseMetadata': {'RequestId': 'c4739663-cb8c-11e9-addf-4bb7f7ab8387',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c4739663-cb8c-11e9-addf-4bb7f7ab8387',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Sat, 31 Aug 2019 01:15:05 GMT'},
  'RetryAttempts': 0}}

Conclusion

The power of infrastructure-as-code is evident in the fact that we were able to launch a 4-node Redshift cluster, perform our analysis, and destroy all the resources, without once having to login to AWS console. This is the essense of cloud computing, wherein, you can spin the resources as-and-when you want, do whatever task you wish to do, and clean up the resources.