Build Stream Processing Applications using KSQL

26 minute read

Introduction to KSQL

KSQL (K-sequel) is a SQL-like interface for building stream processing applications. We will see how we can use simple SQL statements to construct complex stream processing applications just like the ones we built using faust and run ad-hoc queries to explore your streaming datasets. By the end of this post, we will build and deploy our very own stream processing application on top of KSQL.

KSQL is a SQL-like abstraction developed by Confluent, which provides users the ability to create streams and tables. We will see how we can write simple SQL queries to turn our Kafka topics into KSQL streams and tables, and then write those tables back out to Kafka.

We will address the following questions:

  • What KSQL is and how it is architected?
  • Why you would choose KSQL vs. an application framework like Faust?
  • How to turn Kafka Topics into KSQL Tables?
  • How to turn Kafka Topics into KSQL Streams?
  • How we can query KSQL?
  • How we can window in KSQL?
  • How to aggregate data in KSQL?
  • How to join data in KSQL?

Key points:

  • KSQL is a new and very exciting entry into the stream processing space. Without writing any application logic whatsoever, you can write SQL statements that KSQL can then use to transform the data in your Kafka topics into stream processing applications. What is so wonderful about KSQL is that it allows developers to use a SQL-like syntax that they are already very familiar with to quickly and easily create stream processing applications.

  • We can use KSQL to build stream processing applications simply by writing SQL-like statements! In contrast to Faust, we’re not actually going to build an application using a programming language like Python. Instead, KSQL will transform our SQL commands into running stream processing applications for us!

  • We will see how KSQL can express all the familiar concepts of stream processing applications with the exclusive use of SQL syntax.

  • KSQL provides a SQL-like interface to transform Kafka Topics into streams and tables.
  • Joins, aggregates, filtering, and other forms of data manipulation can then be expressed over these streams and tables.

Using KSQL we can transform our Kafka topics into streams and tables. Once we have turned these topics into either streams or tables, we are free to use KSQL’s SQL syntax to perform many familiar operations. For example, you can join, aggregate, filter these tables and streams once they have been created. ksql-intro-1

Behind the scenes, the KSQL web server is translating your SQL commands into a series of instantiated Kafka Streams. So, that rocket in the image - KSQL is transforming the SQL create statement, into Kafka Stream application.

KSQL Architecture

As with Kafka, Kafka Connect, Kafka REST Proxy, KSQL is built in Java and Scala and runs on the JVM. KSQL itself is simply a web server that accepts incoming HTTP REST calls to configure underlying Kafka stream components to execute the stream processes described in our SQL queries. Because KSQL uses Kafka Streams, it uses a kafka topic as a changelog and RocksDB to store local state on every node where KSQL is deployed.

KSQL provides a few options for interaction.

  • First, we can use it’s REST API
  • Second, there is the KSQL CLI. The KSQL CLI is a command line tool that works similarly to tools like PSQL for postgres. You basically connect to a server and can then run interactive queries.
  • The last option is what you should do in production. In prod, you should place a file containing your desired SQL queries on your KSQL hosts and start them up. The KSQL web server will then load this SQL file and automatically execute the queries described within it.

KSQL vs Faust

KSQL vs. Traditional Frameworks:

Pros:

  • It is often simpler to use KSQL and SQL than to build and deploy an entire application
  • KSQL is typically a better fit for rapid experimentation and exploration than a full stream processing application
  • KSQL doesn’t require a particular programming language, like Python for Faust, or Java for Kafka Streams
  • KSQL already comes bundled with standard logs, metrics, and tooling for you to use, so you don’t have to build it yourself

Cons:

  • SQL does not always best capture certain types of data manipulation or remapping scenarios
  • Can’t just use whatever other libraries you want like you can with Faust
  • However, KSQL does allow User Defined Functions (UDFs), written in Java

Tables and Streams in KSQL

Before we can actually use tables or streams in KSQL, we need to transform one or more existing input Kafka topic into a KSQL table or stream. First, we can ask KSQL to show us the topics that it is aware of by using the SHOW TOPICS command. Show topics simply displays all of the available topics on the broker. Once you have identified the topic we want to use, we are ready to acutally create a table or stream.

local-kafka-2

An important point to note here is that just as in Faust, a Kafka topic always underlies the stream or table that we actually create.

Key Points:

  • Every KSQL Table or Stream is derived from an underlying Kafka Topic.
  • You turn Kafka Topics into Tables and Streams.

Creating a Stream

There are two options for creating streams in KSQL. The first option is to simply define the stream attributes and types with a create statement. We always start by declaring the name of the stream purchases that we wish to create. Next, we declare the attributes/columns and their types that we expect for each one of those. The final portion of the create statement is the WITH portion, this provides additional information so that KSQL knows which Kafka topic to source from, and the VALUE_FORMAT simply describes, the serialization format of the data in the Kafka topic - this could be one of AVRO, JSON, or delimited - means csv.

CREATE STREAM purchases (
    username VARCHAR,
    currency VARCHAR,
    amount INT
) WITH (
    KAFKA_TOPIC='purchases',
    VALUE_FORMAT='JSON'
)

It is also possible to create a stream with a select statement that excludes or modifies data from another pre-existing stream. So, in this example, we are creating a stream purchases_high_value by selecting all of attributes from a pre-existing purchases stream.

CREATE STREAM purchases_high_value AS
    SELECT *
    FROM purchases
    WHERE amount > 100000;

Summary of Stream Creation:

  • Creating Streams from an underlying topic requires you to specify column names and their types
  • You must also specify the serialization format as one of JSON, AVRO, or DELIMITED (csv)
  • You must also specify the underlying topic name
  • You may create a stream from another existing stream with CREATE STREAM AS SELECT …
  • KSQL Create Stream Documentation
  • KSQL Create Stream from SELECT documentation

Example of creating a Stream: We will now see two ways to create a KSQL Stream. Once we have created our stream, we’ll also see how to delete the stream.

Using CREATE STREAM syntax

Let’s first start by checking the existing topics and streams.

Check the existing topics and streams:

ksql-intro-2

We will focus on two topics: com.udacity.streams.clickevents and com.udacity.streams.pages

  • com.udacity.streams.clickevents has the following data shape:
    • key: <uri: string>
    • value:
        {
          "email": <string>,
          "timestamp": <string>,
          "uri": <string>,
          "number": <int>
        }
      
  • com.udacity.streams.pages has the following data shape:
    • key: <uri: string>
    • value:
        {
            "uri": <string>,
            "description": <string>,
            "created": <date>
        }
      

Now we are going to create a Stream for clickevents:

CREATE STREAM clickevents
  (email VARCHAR,
   timestamp VARCHAR,
   uri VARCHAR,
   number INTEGER)
  WITH (KAFKA_TOPIC='com.udacity.streams.clickevents',
        VALUE_FORMAT='JSON');

Output:

ksql> CREATE STREAM clickevents
>  (email VARCHAR,
>   timestamp VARCHAR,
>   uri VARCHAR,
>   number INTEGER)
>  WITH (KAFKA_TOPIC='com.udacity.streams.clickevents',
>        VALUE_FORMAT='JSON');

 Message        
----------------
 Stream created
----------------
ksql> show streams;

 Stream Name | Kafka Topic                     | Format
--------------------------------------------------------
 CLICKEVENTS | com.udacity.streams.clickevents | JSON   
--------------------------------------------------------
ksql>

Now that we have created the KSQL stream from the clickevents topic, we can query using SQL query. Observe how the data is stored within this stream. The key uri and its corresponding value make up each record in the stream.

ksql-intro-3

Using CREATE STREAM .. AS SELECT .. syntax

Create a stream with a query: KSQL Also allows for the creation of Streams derived from queries. Suppose that we want to create a stream of clickevents with more than or equal to 100 clicks. Then we would use the below method:

ksql> CREATE STREAM popular_uris AS
>  SELECT * FROM clickevents
>  WHERE number >= 100;

 Message                    
----------------------------
 Stream created and running
----------------------------
ksql> show streams;

 Stream Name  | Kafka Topic                     | Format
---------------------------------------------------------
 CLICKEVENTS  | com.udacity.streams.clickevents | JSON   
 POPULAR_URIS | POPULAR_URIS                    | JSON   
---------------------------------------------------------
ksql>

Notice that the underlying topic for POPULAR_URIS is a new Kafka Topic POPULAR_URIS, which got created automatically. Shown below is the output of the stream containing only clickevents with clicks more than 100.

ksql-intro-4

Deleting Streams

In order to delete a stream, we first need to make sure that there are no running queries. What does that mean? If you see the output of the CREATE STREAM .. AS SELECT .. statement, the output says that it created a stream and running. What this means is that KSQL is still running that query in the background, that’s how it gets data streaming into this stream. So, for this reason, we need to first stop the running query and then drop the stream. We cannot directly drop the stream.

ksql> show queries;

 Query ID            | Kafka Topic  | Query String                  
--------------------------------------------------------------------
 CSAS_POPULAR_URIS_0 | POPULAR_URIS | CREATE STREAM popular_uris AS
  SELECT * FROM clickevents
  WHERE number >= 100;
--------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
ksql>

Grab the query ID: CSAS_POPULAR_URIS_0 and terminate it.

ksql> TERMINATE CSAS_POPULAR_URIS_0;

 Message           
-------------------
 Query terminated.
-------------------
ksql> show queries;

 Query ID | Kafka Topic | Query String
---------------------------------------
---------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
ksql>

Now run the DROP STREAM command:

ksql> DROP STREAM POPULAR_URIS;

 Message                           
-----------------------------------
 Source POPULAR_URIS was dropped.  
-----------------------------------
ksql> show streams;

 Stream Name | Kafka Topic                     | Format
--------------------------------------------------------
 CLICKEVENTS | com.udacity.streams.clickevents | JSON   
--------------------------------------------------------
ksql>

Clean up the topics

Something interesting to understand here is that, when we created the POPULAR_URIS Stream, we observed that there is a new topic with the same name. But, when we deleted the POPULAR_URIS stream, it did not delete the topic, as can be seen here:

ksql-intro-5

WHY?: First, POPULAR_URIS topic has been created and is still present. By default, this is how KSQL behaves. If you’d like to clean up the topic you need to do it manually. Second, why was a POPULAR_URIS topic created, but not one for the stream CLICKEVENTS? POPULAR_URIS actually required modification to the data, so, an intermediate topic was created. CLICKEVENTS, however, required no modification, so the underlying topic is used as-is.

OK, so lets run DROP TOPIC POPULAR_URIS, and we see that we get this message:

ksql> DROP TOPIC POPULAR_URIS;
No topic with name POPULAR_URIS was registered.
ksql>

WHY?: KSQL topic is different concept than Kafka topic. KSQL topic is an internal concept for KSQL that represents a kafka topic along with metadata about that topic including the topic format. Since we do not expose KSQL topic externally you should not use it in KSQL statements. If you wanna delete a kafka topic, you should delete it from kafka. In future we plan to add topic management capability to KSQL As per this

Creating Tables using KSQL

Creating tables in KSQL works very similarly to creating streams. The major difference is that when we create a table, a key is used to uniquely identify the data. A table is an aggregated view of our data. It is NOT an infinite sequence of immutable data events like a stream. Similar to streams, there are 2 ways in which we can create tables:

  • Using the CREATE TABLE syntax
  • Using the CREATE TABLE .. AS SELECT .. syntax

CREATE TABLE syntax:

CREATE TABLE users(
    username VARCHAR,
    address VARCHAR,
    email VARCHAR,
    phone_number VARCHAR
) WITH(
    KAFKA_TOPIC = 'purchases',
    VALUE_FORMAT = 'JSON',
    KEY = 'username'
)

Notice that we specified a KEY attribute. The KEY attribute is required. Specifying a key is used for when aggregates and joins are performed. The KEY property is optional. KSQL uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins.

Keys are an extremely important topic in KSQL. A whole post could be dedicated just to understand KEY. Some great references are provided here:

CREATE TABLE .. AS SELECT .. syntax:

CREATE TABLE purchases_high_value AS
SELECT *
FROM purchases
WHERE amount > 100000;

So, what about the KEY in this case? It uses the KEY of the purchases stream, which could be something like the username, and it will keep the highest purchase for each one of the usernames.

SET auto.offset.reset

Before we dive into creating the Table, we need to understand the concept of offsets.

Managing Offsets: Like all Kafka Consumers, KSQL by default begins consumption at the latest offset. This can be a problem for some scenarios. In the following example we’re going to create a pages table – but – we want all the data available to us in this table. In other words, we want KSQL to start from the earliest offset. To do this, we will use the SET command to set the configuration variabl auto.offset.reset for our session.

SET 'auto.offset.reset' = 'earliest';

Note: Also note that this can be set at the KSQL server level, if you’d like.

Once you’re done querying or creating tables or streams with this value, you can set it back to its original setting by simply running:

UNSET 'auto.offset.reset';

Example CREATE TABLE syntax:

Example creating a Table in KSQL:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql>

Shown below is the output of the following commands:

  • Run show topics: to check the current topics
  • Run CREATE TABLE ..: this will create our table using the topic com.udacity.streams.pages.
  • Run show tables: to check the current tables. Notice the Format and Windowed columns.
  • Run show topics: no new topic will be created because we are not modifying the data to create the table.
ksql> show topics;

 Kafka Topic                     | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-------------------------------------------------------------------------------------------------------------
 _confluent-metrics              | false      | 12         | 1                  | 0         | 0              
 _schemas                        | false      | 1          | 1                  | 0         | 0              
 com.udacity.streams.clickevents | false      | 10         | 1                  | 0         | 0              
 com.udacity.streams.pages       | false      | 10         | 1                  | 0         | 0              
 com.udacity.streams.purchases   | false      | 10         | 1                  | 0         | 0              
 com.udacity.streams.users       | false      | 10         | 1                  | 0         | 0              
 connect-configs                 | false      | 1          | 1                  | 0         | 0              
 connect-offsets                 | false      | 25         | 1                  | 0         | 0              
 connect-status                  | false      | 5          | 1                  | 0         | 0              
-------------------------------------------------------------------------------------------------------------
ksql> CREATE TABLE pages
>  (uri VARCHAR,
>   description VARCHAR,
>   created VARCHAR)
>  WITH (KAFKA_TOPIC='com.udacity.streams.pages',
>        VALUE_FORMAT='JSON',
>        KEY='uri');

 Message       
---------------
 Table created
---------------
ksql> show tables;

 Table Name | Kafka Topic               | Format | Windowed
------------------------------------------------------------
 PAGES      | com.udacity.streams.pages | JSON   | false    
------------------------------------------------------------
ksql> show topics;

 Kafka Topic                     | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-------------------------------------------------------------------------------------------------------------
 _confluent-metrics              | false      | 12         | 1                  | 0         | 0              
 _schemas                        | false      | 1          | 1                  | 0         | 0              
 com.udacity.streams.clickevents | false      | 10         | 1                  | 0         | 0              
 com.udacity.streams.pages       | true       | 10         | 1                  | 0         | 0              
 com.udacity.streams.purchases   | false      | 10         | 1                  | 0         | 0              
 com.udacity.streams.users       | false      | 10         | 1                  | 0         | 0              
 connect-configs                 | false      | 1          | 1                  | 0         | 0              
 connect-offsets                 | false      | 25         | 1                  | 0         | 0              
 connect-status                  | false      | 5          | 1                  | 0         | 0              
-------------------------------------------------------------------------------------------------------------
ksql>

The only new field we have provided here is KEY, which is the string key that uniquely identifies our records. Remember with KSQL TABLEs we will keep track of the latest value for a given key, not all values we have ever seen for a key.

Example CREATE TABLE .. AS SELECT .. syntax:

Tables, like Streams, may also be derived from queries. Lets create a Table of all pages whose url starts with the letter a. Shown below is the output of the following commands:

  • Run CREATE TABLE .. AS SELECT.. statement.
  • Run show tables;: notice a new Kafka topic with the same name is created because we are modifying the data.
  • Run DESCRIBE pages;: KSQL provides a lot of valuable information to us with the DESCRIBE command. It is useful for understanding what columns and column types are defined on your tables. Secondly, observe the fields ROWTIME and ROWKEY.
ksql> CREATE TABLE a_pages AS
>  SELECT * FROM pages WHERE uri LIKE 'http://www.a%';

 Message                   
---------------------------
 Table created and running
---------------------------
ksql> show tables;

 Table Name | Kafka Topic               | Format | Windowed
------------------------------------------------------------
 A_PAGES    | A_PAGES                   | JSON   | false    
 PAGES      | com.udacity.streams.pages | JSON   | false    
------------------------------------------------------------
ksql> DESCRIBE pages;

Name                 : PAGES
 Field       | Type                      
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 URI         | VARCHAR(STRING)           
 DESCRIPTION | VARCHAR(STRING)           
 CREATED     | VARCHAR(STRING)           
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE a_pages;

Name                 : A_PAGES
 Field       | Type                      
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 URI         | VARCHAR(STRING)           
 DESCRIPTION | VARCHAR(STRING)           
 CREATED     | VARCHAR(STRING)           
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

Deleting the table:

As with Streams, we must first find the running underlying query, and then drop the table.

ksql> show queries;

 Query ID       | Kafka Topic | Query String                                          
--------------------------------------------------------------------------------------
 CTAS_A_PAGES_0 | A_PAGES     | CREATE TABLE a_pages AS
  SELECT * FROM pages WHERE uri LIKE 'http://www.a%';
--------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
ksql> TERMINATE CTAS_A_PAGES_0;

 Message           
-------------------
 Query terminated.
-------------------
ksql> DROP TABLE a_pages;

 Message                      
------------------------------
 Source A_PAGES was dropped.  
------------------------------
ksql> show tables;

 Table Name | Kafka Topic               | Format | Windowed
------------------------------------------------------------
 PAGES      | com.udacity.streams.pages | JSON   | false    
------------------------------------------------------------
ksql>

Querying

Now that we have created a few tables and streams, it’s time now to see how to query these. Let’s say we are running this query to get all users who made purchases more than 100000 in US.

SELECT username
FROM purchases
WHERE currency = 'USD' AND
    amount > 100000;

If I Ctrl-C and exit while this query is running, we will immediately loose all the data. Because this is not a persistent query. In other words, the query does not continue to run in the background. This is a very important concept to understand with KSQL.

In KSQL, if you want your streaming query to keep running and be able to use that data, you need to use CREATE TABLE AS OR CREATE STREAM AS syntax. Otherwise, when your client session closes, your data will be wiped out.

Key Points:

  • SELECT statements may be run in KSQL CLI, but as soon as the session is terminated, so too is the data calculation.
  • Use CREATE STREAM <stream_name> AS SELECT… and CREATE TABLE <table_name> AS SELECT … to persist your queries for long-term usage
  • List of all Scalar functions supported for querying

Example: Ad-hoc querying in KSQL is one of the tools greatest strengths. Lets have a look at some sample queries. We’ve already seen how to filter data in the table creation process, but lets revisit it one more time:

ksql> CREATE STREAM clickevents
>  (email VARCHAR,
>   timestamp VARCHAR,
>   uri VARCHAR,
>   number INTEGER)
>  WITH (KAFKA_TOPIC='com.udacity.streams.clickevents',
>        VALUE_FORMAT='JSON');

 Message        
----------------
 Stream created
----------------
ksql> show streams;

 Stream Name | Kafka Topic                     | Format
--------------------------------------------------------
 CLICKEVENTS | com.udacity.streams.clickevents | JSON   
--------------------------------------------------------
ksql> SELECT uri, number
>  FROM clickevents
>  WHERE number > 100
>    AND uri LIKE 'http://www.k%';


http://www.kim.info/explore/main/search/ | 844
http://www.knight.com/blog/index/ | 793
http://www.kim.info/explore/main/search/ | 378
http://www.kim.info/explore/main/search/ | 610
http://www.kim.info/explore/main/search/ | 102
http://www.kim.info/explore/main/search/ | 570
http://www.kim.info/explore/main/search/ | 281
http://www.kim.info/explore/main/search/ | 609
http://www.krueger.biz/wp-content/index.html | 865
http://www.krueger.biz/wp-content/index.html | 225
^CQuery terminated
ksql>

Apply Scalar functions: KSQL Provides a number of Scalar functions for us to make use of..

The query shown below, will strip the http://www. from the start of the URI and capitalize it.

The SUBSTRING function takes a string and which character to start at (and optionally end at) The UCASE function takes a string and capitalizes it.

ksql> SELECT UCASE(SUBSTRING(uri, 12))
>  FROM clickevents
>  WHERE number > 100
>    AND uri LIKE 'http://www.k%';
KRUEGER.BIZ/WP-CONTENT/INDEX.HTML
KNIGHT.COM/BLOG/INDEX/
KIM.INFO/EXPLORE/MAIN/SEARCH/
^CQuery terminated
ksql>

There is no need to terminate the query, as SELECT statements are non-persistent in KSQL. if you want the results of this query to be persistent, you need to create a Table or a Stream.

Windowing

Windowing in KSQL, is largely similar to windowing in Faust. KSQL supports both Hopping and Tumbling windows. In addition to that, KSQL also supports a type of windowing known as Session Windowing.

As with Faust or any other stream processing framework, windows in KSQL are used during aggregate operations. Example of the usage of a Tumbling window:

SELECT currency, SUM(amount)
FROM purchase
WINDOW TUMBLING (SIZE 10 MINUTES)
GROUP BY currency

We simply specify that the window is TUMBLING and specify that the size of the window should be 10 minutes. Size could be seconds or hours too. We can easily turn this into a HOPPING window by modifying the query and adding the ADVANCE BY keyword to the interval. Example of the usage of a Hopping window:

SELECT currency, SUM(amount)
FROM purchase
WINDOW HOPPING (SIZE 10 MINUTES ADVANCE BY 1 MINUTES)
GROUP BY currency

Here, we are simply asking KSQL to take this 10-minute window that we have defined and move it forward every 1 minute.

Session Windowing: This isn’t supported in Faust, but is supported in KSQL. The way session windowing works is by keeping track of when was the last time a particular key was seen. When a new record, with the same KEY arrives, the difference between the timestamps is calculated. You can read more about it here session windows

Aggregating

GROUP BY allows re-partitioning of a stream or table on a new key. The output of a GROUP BY in KSQL is always a table. Example:

KSQL provides a number of useful aggregate functions, such as MAX/MIN, SUM, COUNT and others. Here we’re going to see how we can create aggregated Tables from our KSQL queries.

CREATE TABLE clickevents
  (email VARCHAR,
   timestamp VARCHAR,
   uri VARCHAR,
   number INTEGER)
  WITH (KAFKA_TOPIC='com.udacity.streams.clickevents',
        VALUE_FORMAT='JSON',
        KEY='uri');
SELECT uri, SUM(number)
FROM clickevents
GROUP BY uri;

When we run this query we will receive an output list that aggregates, by number, the total count of number by amount, to date, for that URI.

You will notice that values continue to print out to the screen on a periodic basis – thats ok, that just means the table-based representation is updating. You could instead create a table and then periodically query that table to view updates.

ksql> show streams;

 Stream Name | Kafka Topic | Format
------------------------------------
------------------------------------
ksql> show tables;

 Table Name | Kafka Topic | Format | Windowed
----------------------------------------------
----------------------------------------------
ksql> show topics;

 Kafka Topic                     | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-------------------------------------------------------------------------------------------------------------
 _confluent-metrics              | false      | 12         | 1                  | 0         | 0              
 _schemas                        | false      | 1          | 1                  | 0         | 0              
 com.udacity.streams.clickevents | false      | 10         | 1                  | 0         | 0              
 com.udacity.streams.pages       | false      | 10         | 1                  | 0         | 0              
 com.udacity.streams.purchases   | false      | 10         | 1                  | 0         | 0              
 com.udacity.streams.users       | false      | 10         | 1                  | 0         | 0              
 connect-configs                 | false      | 1          | 1                  | 0         | 0              
 connect-offsets                 | false      | 25         | 1                  | 0         | 0              
 connect-status                  | false      | 5          | 1                  | 0         | 0              
-------------------------------------------------------------------------------------------------------------

ksql> CREATE TABLE clickevents
>  (email VARCHAR,
>   timestamp VARCHAR,
>   uri VARCHAR,
>   number INTEGER)
>  WITH (KAFKA_TOPIC='com.udacity.streams.clickevents',
>        VALUE_FORMAT='JSON',
>        KEY='uri');

 Message       
---------------
 Table created
---------------
ksql> show tables;

 Table Name  | Kafka Topic                     | Format | Windowed
-------------------------------------------------------------------
 CLICKEVENTS | com.udacity.streams.clickevents | JSON   | false    
-------------------------------------------------------------------
ksql> SELECT uri, SUM(number)
>FROM clickevents
>GROUP BY uri;
https://gomez.info/tags/search/about/ | 547
http://good-greene.info/explore/search/blog/index/ | 868
https://www.walker.net/homepage.jsp | 943
http://www.myers.info/tags/tag/posts/terms.htm | 330
https://gray-roberts.com/search.php | 809
https://gomez.info/tags/search/about/ | 944
http://www.norris.biz/ | 115
http://jones.info/category/category.htm | 598
http://higgins.org/search/ | 829
http://phillips.info/ | 459
https://boone.com/index.asp | 838
http://www.myers.com/home/ | 660
http://hall.com/list/index.html | 739
http://www.hale-king.com/ | 964
https://www.perez.info/login/ | 899 ----> first occurrence
http://vance.info/about/ | 916
https://www.wilson.com/app/tag/register.html | 783
https://wallace.com/ | 713
http://www.padilla.com/search/categories/tag/post.html | 551
https://brown.biz/wp-content/tags/category/privacy/ | 507
http://www.patrick.com/search/ | 650
http://www.welch.com/about.html | 304
http://pierce.info/ | 31
https://ellis.com/wp-content/blog/category/privacy/ | 641
https://hart.com/post.html | 72
http://www.ray.com/ | 173
http://www.marquez-freeman.com/home/ | 639
https://hernandez-smith.net/categories/explore/app/search.htm | 296
https://www.castro.info/ | 71
https://www.rivera.biz/login/ | 341
https://www.kennedy-baker.net/tag/list/list/main.html | 332
https://taylor-mathis.com/categories/app/categories/search.asp | 741
https://www.carroll.com/ | 731
http://hudson.com/list/posts/register.asp | 372
https://www.ramirez.org/category.htm | 694
http://www.perez-roberson.com/ | 337
http://www.west.net/explore/blog/main/index/ | 187
http://www.garner-howard.net/privacy.html | 374
https://jones.com/login/ | 883
https://www.newman.com/app/explore/homepage.php | 118
http://www.forbes.net/posts/blog/search/ | 353
https://www.smith.org/author.asp | 872
https://martin.com/register/ | 588
http://www.lopez.com/list/categories/search/terms/ | 303
https://www.rivera.biz/login/ | 471
https://johnson.com/posts/tag/index/ | 251
http://gonzales.info/author/ | 489
https://www.perez.info/login/ | 15 ----> second occurrence
^CQuery terminated

ksql> SELECT * FROM clickevents WHERE uri = 'https://wallace.com/';
1579625040990 | https://wallace.com/ | spencersarah@yahoo.com | 1993-06-19T11:45:57 | https://wallace.com/ | 833
1579625043011 | https://wallace.com/ | jamescobb@dixon.biz | 2001-03-27T22:29:43 | https://wallace.com/ | 142
1579625157824 | https://wallace.com/ | justin76@carlson.com | 1993-12-13T02:08:16 | https://wallace.com/ | 448
1579625191008 | https://wallace.com/ | lisa66@cook.com | 1996-10-06T16:30:34 | https://wallace.com/ | 309
1579625215095 | https://wallace.com/ | barrsteve@hickman.com | 2019-05-07T18:21:50 | https://wallace.com/ | 717
^CQuery terminated
ksql>

Joins

One of the most frequent operations we are likely to perform in our stream processing applications is to combine or join multiple streams into one output stream. Here, we will see how KSQL makes this process painless with simple SQL join syntax. In the example shown below, we are combining the user stream with purchases stream, producing a new output stream based on the shared key attribute username.

SELECT
    p.username,
    p.amount,
    u.email
FROM purchase as p
JOIN users as u
ON p.username = u.username

KSQL supports JOIN operations on co-partitioned streams and/or tables.

This is one of the most common use cases for joining in stream processing. It is fairly frequent that we want to enrich one data source with information from one or more other data sources. This is a good place to pause and discuss some of the requirements for joining in KSQL.

  • If the streams that are being joined are not co-partitioned (meaning they don’t share the same key), then we can’t join these streams.
  • The KEY that we choose to use in KSQL must also match the KEY for our messages in the Kafka topic.

So, in the above example, if user and purchase didn’t share the username key attribute, we would have to co-partition these streams to do the join. This doesn’t mean that we can’t join the tables or streams we are interested in if they are not co-parititioned, it simply means we have to repartition our data so that a join is possible. To repartition one or more streams for a join, we can use a GROUP BY to create an intermediate stream that is appropriately co-partitioned for the join.

Lastly, we can join:

  • Tables - to - Tables.
  • Stream - to - Stream.
  • Stream - to - Table.

But, you cannot join Tables to Streams.

ksql-intro-6

Example: Join clickevents stream to the pages table: The underlying topic for clickevents is com.udacity.streams.clickevents and for pages is com.udacity.streams.pages. Check the Create Streams section of this post to see the data shape of these topics. We will start this example by first creating the clickevents stream and the pages table.

  • Create the clickevents stream:
CREATE STREAM clickevents
  (email VARCHAR,
   timestamp VARCHAR,
   uri VARCHAR,
   number INTEGER)
  WITH (KAFKA_TOPIC='com.udacity.streams.clickevents',
        VALUE_FORMAT='JSON');
  • Create the pages table:
CREATE TABLE pages(
  uri VARCHAR,
  description VARCHAR,
  created VARCHAR
)
WITH (
  KAFKA_TOPIC='com.udacity.streams.pages',
  VALUE_FORMAT='JSON',
  KEY='uri'
);

LEFT OUTER JOIN: In KSQL, as with most SQL derivatives, the default JOIN is the LEFT OUTER JOIN.

CREATE STREAM clickevent_pages AS
  SELECT ce.uri, ce.email, ce.timestamp, ce.number, p.description, p.created
  FROM clickevents ce
  JOIN pages p on ce.uri = p.uri;

output:

ksql> CREATE STREAM clickevent_pages AS
>  SELECT ce.uri, ce.email, ce.timestamp, ce.number, p.description, p.created
>  FROM clickevents ce
>  JOIN pages p on ce.uri = p.uri;

 Message                    
----------------------------
 Stream created and running
----------------------------
ksql>

Shown below is the output of the output stream which has the joined data.

ksql-intro-7

INNER JOIN:

ksql> CREATE STREAM clickevent_pages_inner AS
>  SELECT ce.uri, ce.email, ce.timestamp, ce.number, p.description, p.created
>  FROM clickevents ce
>  INNER JOIN pages p on ce.uri = p.uri;

 Message                    
----------------------------
 Stream created and running
----------------------------

Run show queries to see the two running queries.

ksql> show queries;

 Query ID                      | Kafka Topic            | Query String                                                                 
--------------------------------------------------------------------------------------------------
 CSAS_CLICKEVENT_PAGES_0       | CLICKEVENT_PAGES       | CREATE STREAM clickevent_pages AS
  SELECT ce.uri, ce.email, ce.timestamp, ce.number, p.description, p.created
  FROM clickevents ce
  JOIN pages p on ce.uri = p.uri;
 CSAS_CLICKEVENT_PAGES_INNER_1 | CLICKEVENT_PAGES_INNER | CREATE STREAM clickevent_pages_inner AS
  SELECT ce.uri, ce.email, ce.timestamp, ce.number, p.description, p.created
  FROM clickevents ce
  INNER JOIN pages p on ce.uri = p.uri;
--------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
ksql>

View the data in the stream:

ksql-intro-8

Glossary

STREAM: A stream is an unbounded sequence of structured data (“facts”). For example, we could have a stream of financial transactions such as “Alice sent 100 dollars to Bob, then Charlie sent 50 dollars to Bob”. Facts in a stream are immutable, which means new facts can be inserted to a stream, but existing facts can never be updated or deleted. Streams can be created from an Apache Kafka topic or derived from an existing stream. A stream’s underlying data is durably stored (persisted) within a Kafka topic on the Kafka brokers.

TABLE: A table is a view of a stream, or another table, and represents a collection of evolving facts. For example, we could have a table that contains the latest financial information such as “Bob’s current account balance is 150 dollars”. It is the equivalent of a traditional database table but enriched by streaming semantics such as windowing. Facts in a table are mutable, which means new facts can be inserted to the table, and existing facts can be updated or deleted. Tables can be created from a Kafka topic or derived from existing streams and tables. In both cases, a table’s underlying data is durably stored (persisted) within a Kafka topic on the Kafka brokers.

Kafka Streams - A Java library for constructing stream processing applications. KSQL translates SQL statements to Kafka Streams applications.

User Defined Function (UDF) - An extension to the SQL capabilities of KSQL written by the user. For KSQL, these are written in Java.

Key (KSQL) - Data which uniquely identifies the value contained in this data message relative to other pieces of data in the stream. For example, a user_id may uniquely identify a user object.

Session Windowing (KSQL) - A system that keeps track of when the last time a particular key was seen. When a new record with the same key arrives, the difference between the timestamps is calculated. If the difference is larger than a defined session window, then a new window is started for that session. If the difference is less than the defined session window, the new record is added to the existing window.