Kafka Connect to stream data into Kafka

13 minute read

Kafka Connect

Kafka Connect is a framework to stream data into and out of Apache Kafka.

Goal of this post:

  1. To show how we can use Kafka Connect to push logs into Kafka using FileStream connector and
  2. To show how we can use Kafka Connect to push SQL data from a table into Kafka using the JDBC Source connector.

Understand the need for Kafka Connect

We can use Kafka client libraries to send and receive data to and from Kafka. Using these client libraries to develop your own producers and consumers is a laborious and time consuming process. This is where Kafka Connect comes in.

So, why would you choose Kafka Connect over Kafka client libraries?:

  • For one, it saves time, because someone has already written a producer/consumer and made it available as a plugin(jar).
  • Allows users to repeatedly implement similar Kafka integrations. You build your integration once in Java (i.e if it is not already implemented), then with just a little bit of configuration, you can repeatedly reuse the same integration logic.
  • Provides an abstraction from Kafka for application code. (i.e, your application doesn’t need to know that it is sending its data to Kafka)
  • Naturally, this will decrease the amount of code you need to maintain.

Example to understand the need for Kafka Connect:

Imagine this scenario,

You are working on an e-commerce application, which has dozens of models in a Postgres Database: some models represent purchases, some represent users and address. We want all of this data to be available in Kafka (see figure below). BUT, you don’t want to write dozens of kafka producers to put that data into kafka. It would take weeks of effort and testing to make sure that happens. INSTEAD, we could turn to Kafka Connect and one of its most popular plugins: “SQL JDBC Connector plugin” to do that work for us.

We can then use the HTTP REST API of Kafka Connect to periodically query all the tables of the SQL database and automatically dump the records into topics for each table.

need-for-kafka-connect

To put it more technically, Kafka Connect provides a powerful abstraction over Kafka for your applications.

Lastly, because kafka connect can integrate with your databases and other sources of data, such as logs, it’s possible to avoid integrating Kafka client code into your applications entirely.

Glossary of Kafka Connect terms

Kafka Connect is a framework to stream data into and out of Apache Kafka.

At its core, Kafka Connect is nothing but a web server and a framework. It runs within an java process (JVM).

  • Kafka Connect - A web server and framework for integrating Kafka with external data sources such as SQL databases, log files, and HTTP endpoints.
  • Connector - A JAR built on the Kafka Connect framework which integrates to an external system to either source or sink data from Kafka.
  • Source - A Kafka client putting data into Kafka from an external location, such as a data store
  • Sink - A Kafka client removing data from Kafka into an external location, such as a data store
  • Task - Responsible for actually interacting with and moving data within a Kafka connector. One or more tasks make up a connector.

Useful links:

Kafka Connect REST API

Now that we understood what Kafka Connect is all about, lets look into how to manage Kafka Connect. Kafka Connect is managed entirely through an HTTP REST API. What is great about this is that it makes managing connectors as easy as making simple HTTP calls. If you have access to the server or servers where Kafka connect is running, you can add additional connector plugins while the server is running and configure the server to load them.

Kafka Connect’s Connector configuration can be CREATED, UPDATED, DELETED AND READ (CRUD) via a REST API

  • You can check the status of a specific Connectors task(s) via the API
  • You can start, stop, and restart Connectors via the API
  • The choice of a REST API provides a wide-array of integration and management opportunities

Invoke REST API using curl

Check existing connector plugins

curl http://localhost:8083/connector-plugins | python -m json.tool

root@4c0e648df0d2:/home/workspace# curl http://localhost:8083/connector-plugins | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1267  100  1267    0     0  52791      0 --:--:-- --:--:-- --:--:-- 52791
[
    {
        "class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
        "type": "source",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type": "sink",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "type": "sink",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
        "type": "source",
        "version": "2.1.1-cp3"
    },
    {
        "class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
        "type": "source",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.jms.JmsSourceConnector",
        "type": "source",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
        "type": "source",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.s3.S3SinkConnector",
        "type": "sink",
        "version": "5.1.3"
    },
    {
        "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
        "type": "source",
        "version": "2.1.1-cp3"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.1.1-cp3"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.1.1-cp3"
    }
]
root@4c0e648df0d2:/home/workspace#

Create a Connector

To create a connector, run the below command. ` curl -X POST -H ‘Content-Type: application/json’ -d ‘{ “name”: “first-connector”, “config”: { “connector.class”: “FileStreamSource”, “tasks.max”: 1, “file”: “/var/log/journal/confluent-kafka-connect.service.log”, “topic”: “kafka-connect-logs” } }’
http://localhost:8083/connectors `

root@4c0e648df0d2:/home/workspace# curl -X POST -H 'Content-Type: application/json' -d '{
>     "name": "first-connector",
>     "config": {
>         "connector.class": "FileStreamSource",
>         "tasks.max": 1,
>         "file": "/var/log/journal/confluent-kafka-connect.service.log",
>         "topic": "kafka-connect-logs"
>     }
>   }' \
>   http://localhost:8083/connectors
{"name":"first-connector","config":{"connector.class":"FileStreamSource","tasks.max":"1","file":"/var/log/journal/confluent-kafka-connect.service.log","topic":"kafka-connect-logs","name":"first-connector"},"tasks":[],"type":"source"}

List existing connectors

To check whether the connector got created successfully, we can run the following command:

curl http://localhost:8083/connectors | python -m json.tool

root@4c0e648df0d2:/home/workspace# curl http://localhost:8083/connectors | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    19  100    19    0     0   1187      0 --:--:-- --:--:-- --:--:--  1187
[
    "first-connector"
]

Check details of the connector

To check the details of the connector, we can simply run the following command:

curl http://localhost:8083/connectors/first-connector | python -m json.tool

root@4c0e648df0d2:/home/workspace# curl http://localhost:8083/connectors/first-connector | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   273  100   273    0     0  10500      0 --:--:-- --:--:-- --:--:-- 10500
{
    "name": "first-connector",
    "config": {
        "connector.class": "FileStreamSource",
        "file": "/var/log/journal/confluent-kafka-connect.service.log",
        "tasks.max": "1",
        "name": "first-connector",
        "topic": "kafka-connect-logs"
    },
    "tasks": [
        {
            "connector": "first-connector",
            "task": 0
        }
    ],
    "type": "source"
}

Notice, the details of our connector. It is a FileStreamSource and the file that it is reading from is /var/log/journal/confluent-kafka-connect.service.log and the topic that it is writing to is kafka-connect-logs.

Pause a connector

Sometimes it is desirable to pause a connector, to do so, you can simply run:

curl -X PUT http://localhost:8083/connectors/first-connector/pause

Note: here the method is PUT and not POST

Restart a connector

To start/restart a connector:

curl -X POST http://localhost:8083/connectors/first-connector/restart

Delete a connector

curl -X DELETE http://localhost:8083/connectors/first-connector

root@4c0e648df0d2:/home/workspace# curl -X DELETE http://localhost:8083/connectors/first-connector
root@4c0e648df0d2:/home/workspace# curl http://localhost:8083/connectors | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     2  100     2    0     0    117      0 --:--:-- --:--:-- --:--:--   117
[]

Most common Kafka Connectors: FileStream and SQL JDBC source/sink

Now that we understand the need for Kafka Connect and how it fits into the Kafka Ecosystem, lets dive into configuring some of the most commonly used connectors: FileStream source connector and SQL JDBC Source and Sink Connectors.

FileStream Connector

One of the most common uses of Kafka in many organizations is the routing of log data from many disparate microservices. So, the main questions here are:

  • How do you actually get your logs into Kafka?.
  • Should we modify our application to write logs both to disk and to Kafka?

Well, the answer is that some logging tools do support integrations with Kafka, but one of the easiest and ubiquitous ways to pipe logs into Kafka is to use our beloved Kafka Connect.

To achieve this, Kafka Connect can be configured to use a file stream source connector to monitor changes in a file on a disk. As data in that file changes, Kafka captures those changes and emits each new line as an event to a Kafka topic.

filestreamsource

The thing that is awesome about this approach is that your application can remain entirely unaware that its logs are going to Kafka.

How to create a FileStream Source Connector in python?

Earlier we saw how to create a FileStream Source connector using a simple HTTP REST API by using curl commands. This is not very repeatable. It is very useful to have a python script which does this.

Shown below is python script that when executed, will write to a log file /tmp/{CONNECTOR_NAME}.log, which Kafka Connect is configured to use to stream the data in real-time to the topic shravan_log_stream_topic. The output is shown below

import asyncio
import json

import requests

KAFKA_CONNECT_URL = "http://localhost:8083/connectors"
CONNECTOR_NAME = "file_stream_connector"

def configure_connector():
    """Calls Kafka Connect to create the Connector"""
    print("creating or updating kafka connect connector...")

    rest_method = requests.post
    resp = requests.get(f"{KAFKA_CONNECT_URL}/{CONNECTOR_NAME}")
    if resp.status_code == 200:
        return

    #
    # Kafka Connect Config below.
    #       See: https://docs.confluent.io/current/connect/references/restapi.html
    #       See: https://docs.confluent.io/current/connect/filestream_connector.html#filesource-connector
    #
    resp = rest_method(
        KAFKA_CONNECT_URL,
        headers={"Content-Type": "application/json"}, # specify that we are sending json data
        data=json.dumps( # convert dictionary to string
            {
                "name": CONNECTOR_NAME, # give the connector a name
                "config": {
                    "connector.class": "FileStreamSource", # the java class, see above docs
                    "topic": "shravan_log_stream_topic", # the topic to write to
                    "tasks.max": 1, # number of tasks to run in parallel
                    "file": f"/tmp/{CONNECTOR_NAME}.log", # where to get the data from
                    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
                    "key.converter.schemas.enable": "false",
                    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                    "value.converter.schemas.enable": "false",
                },
            }
        ),
    )

    # Ensure a healthy response was given
    resp.raise_for_status()
    print("connector created successfully")


async def log():
    """Continually appends to the end of a file"""
    with open(f"/tmp/{CONNECTOR_NAME}.log", "w") as f:
        iteration = 0
        while True:
            f.write(f"log number {iteration}\n")
            f.flush()
            await asyncio.sleep(1.0)
            iteration += 1


async def log_task():
    """Runs the log task"""
    task = asyncio.create_task(log())
    configure_connector()
    await task


def run():
    """Runs the simulation"""
    try:
        asyncio.run(log_task())
    except KeyboardInterrupt as e:
        print("shutting down")


if __name__ == "__main__":
    run()
root@c62675692605:/home/workspace# python filestream.py
creating or updating kafka connect connector...
connector created successfully
root@c62675692605:/home/workspace# kafka-console-consumer --topic shravan_log_stream_topic --bootstrap-server localhost:9092 --from-beginning
"log number 0"
"log number 1"
"log number 2"
"log number 3"
"log number 4"
"log number 5"
"log number 6"
"log number 7"
"log number 8"
"log number 9"
"log number 10"
"log number 11"
"log number 12"
"log number 13"
"log number 14"
"log number 15"
"log number 16"
"log number 17"
"log number 18"
"log number 19"
"log number 20"
"log number 21"
"log number 22"
"log number 23"
"log number 24"
"log number 25"

SQL JDBC Source and Sink connectors

The vast majority of Kafka users also have one or more SQL databases that contain data that could be useful to have in Kafka and downstream stream processing applications. Java - which Kafka connect is built in has a standardized API for interfacing with SQL databases called the Java Database Connector or simply JDBC.

Confluent built a Kafka connector on top of JDBC, which can pull data out of one or more tables in a SQL database and places them into one or more Kafka topics, OR pull data from Kafka and place them into database tables. To reiterate, the JDBC connector supports, both source and sink integrations with SQL databases.

A source integration is one in which Kafka Connect is pulling data from a data source into Kafka. A sing integration is one in which Kafka Connect is taking data from Kafka and placing it into another data source.

A common usecase of the JDBC connector is to publish model updates and changes as events to Kafka. This is a JDBC connector source. The JDBC connector can be modified to perform this task on a periodic basis for all or some of your models. The query that the connector uses can even be customized if you’d like to transform the data in some fashion.

JDBC Source Connector Configuration Options

How to create a JDBC Source connector in python?

Our objective here is to configure Kafka Connect such that, it will create a connector called clicks-jdbc which is configured to monitor the clicks table in the classroom db. The data is streamed into a topic clicksclicks.

The output shows the source DB data and the destination topic data.

import asyncio
import json

import requests


KAFKA_CONNECT_URL = "http://localhost:8083/connectors"
CONNECTOR_NAME = "clicks-jdbc"


def configure_connector():
    """Calls Kafka Connect to create the Connector"""
    print("creating or updating kafka connect connector...")

    rest_method = requests.post
    resp = requests.get(f"{KAFKA_CONNECT_URL}/{CONNECTOR_NAME}")
    if resp.status_code == 200:
        return

    #
    #       Complete the Kafka Connect Config below for a JDBC source connector.
    #       You should whitelist the `clicks` table, use incrementing mode and the
    #       incrementing column name should be id.
    #
    #       See: https://docs.confluent.io/current/connect/references/restapi.html
    #       See: https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html
    #
    resp = rest_method(
        KAFKA_CONNECT_URL,
        headers={"Content-Type": "application/json"},
        data=json.dumps(
            {
                "name": CONNECTOR_NAME,  # name of the connector
                "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",  # class
                    "topic.prefix": "clicks",  # prefix for topic name
                    "mode": "incrementing",  # use incrementing mode to determine changes
                    "incrementing.column.name": "id",  # use the id column of clicks table
                    "table.whitelist": "clicks",  # the table we want to get the data from
                    "tasks.max": 1,
                    "connection.url": "jdbc:postgresql://localhost:5432/classroom",
                    "connection.user": "root",
                    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
                    "key.converter.schemas.enable": "false",
                    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                    "value.converter.schemas.enable": "false",
                },
            }
        ),
    )

    # Ensure a healthy response was given
    try:
        resp.raise_for_status()
    except:
        print(f"failed creating connector: {json.dumps(resp.json(), indent=2)}")
        exit(1)
    print("connector created successfully.")
    print("Use kafka-console-consumer and kafka-topics to see data!")


if __name__ == "__main__":
    configure_connector()

Run the above script to create a JDBC source connector and load the data from clicks table into clicksclicks topic. We also told Kafka to monitor the ID column of the clicks table so that any new rows will be sent as events into the the topic.

To verify the that we successfully created the connector, run the following:

root@341d92058c1c:/home/workspace# curl http://localhost:8083/connectors | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    15  100    15    0     0    576      0 --:--:-- --:--:-- --:--:--   576
[
    "clicks-jdbc"
]
root@341d92058c1c:/home/workspace# curl http://localhost:8083/connectors/clicks-jdbc/status | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   167  100   167    0     0   5964      0 --:--:-- --:--:-- --:--:--  5964
{
    "name": "clicks-jdbc",
    "connector": {
        "state": "RUNNING",
        "worker_id": "172.18.0.2:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "172.18.0.2:8083"
        }
    ],
    "type": "source"
}
root@341d92058c1c:/home/workspace#

Source database: Just to show what the source table data looks like, I connected to the classroom db and queried the clicks table. There are 499 rows in the clicks table of the classroom database. We can confirm this from the below output. Our goal is to get this into Kafka.

root@341d92058c1c:/home/workspace# psql classroom
psql (10.10 (Ubuntu 10.10-0ubuntu0.18.04.1))
Type "help" for help.

classroom=# \dt
             List of relations
 Schema |       Name        | Type  | Owner
--------+-------------------+-------+-------
 public | clicks            | table | root
 public | connect_clicks    | table | root
 public | connect_purchases | table | root
 public | purchases         | table | root
(4 rows)

classroom=# select count(*) from clicks;
 count
-------
   499
(1 row)

classroom=#

Destination topic: The python script created clicksclicks topic, because it is a combination of topic.prefix and table.whitelist. To view the topics, we can make use of the kafka-topics cli as shown:

kafka-topics --list --zookeeper localhost:2181

root@341d92058c1c:/home/workspace# kafka-topics --list --zookeeper localhost:2181
__confluent.support.metrics
__consumer_offsets
_confluent-ksql-default__command_topic
_confluent-metrics
_confluent-monitoring
_schemas
clicksclicks
com.udacity.streams.clickevents
com.udacity.streams.pages
com.udacity.streams.purchases
com.udacity.streams.users
connect-configs
connect-offsets
connect-status
root@341d92058c1c:/home/workspace#

To check the data, we can use of the kafka-console-consumer

kafka-console-consumer --bootstrap-server localhost:9092 --topic clicksclicks --from-beginning

Summary

I have shown how to:

  • Apply the Kafka Connect FileStream Source connector to push logs into Kafka
  • Apply the Kafka Connect JDBC Source connector to push SQL data into Kafka

Documentation: