A practical guide to using Kafka REST Proxy

10 minute read

What is Kafka REST Proxy?

The Confluent REST Proxy provides a RESTful interface to a Kafka cluster, with the following key features:

  • makes it easy to produce and consume messages
  • view the state of the cluster
  • perform administrative actions without using the native Kafka protocol or clients

Need for REST Proxy

  • While the Kafka client libraries and Kafka Connect will be sufficient for most Kafka integrations, there are times where existing systems will be unable to use either approach. In these cases, any client that can manage HTTP requests can integrate with Kafka over HTTP REST using the Kafka REST proxy. In this post, we will learn how REST Proxy works and how it can help in these scenarios.
  • Like the rest of the Kafka ecosystem, REST proxy was written in Scala and Java, and because of this choice, REST proxy can run just about anywhere. REST proxy is a simple HTTP web server and you may choose to deploy just one instance or a cluster of many instances.
  • At its core, all REST proxy does is transform structured JSON data from your application into Kafka’s binary data format. Conversely, it can also translate data from Kafka into JSON payload for your application.
  • REST proxy surfaces a number of useful administrative and metadata endpoints for your Kafka cluster. But you cannot create topics via REST proxy.
  • REST proxy can be optionally made aware of Schema Registry so that it can help you manage your Avro schemas.
  • Finally, REST proxy is most useful, when you can’t use a Kafka client directly.

Fetching Cluster Metadata from REST Proxy

We will first look at how to fetch some commonly used cluster metadata using REST Proxy.

Useful URLs:

import json
import requests

REST_PROXY_URL = "http://localhost:8082"

def get_topics():
    """Gets topics from REST Proxy"""
    # https://docs.confluent.io/current/kafka-rest/api.html#get--topics
    resp = requests.get(f"{REST_PROXY_URL}/topics")

    try:
        resp.raise_for_status()
    except:
        print("Failed to get topics {json.dumps(resp.json(), indent=2)})")
        return []

    print("Fetched topics from Kafka:")
    print(json.dumps(resp.json(), indent=2))
    return resp.json()


def get_topic(topic_name):
    """Get specific details on a topic"""
    # https://docs.confluent.io/current/kafka-rest/api.html#get--topics
    resp = requests.get(f"{REST_PROXY_URL}/topics/{topic_name}")

    try:
        resp.raise_for_status()
    except:
        print("Failed to get topics {json.dumps(resp.json(), indent=2)})")

    print("Fetched topics from Kafka:")
    print(json.dumps(resp.json(), indent=2))


def get_brokers():
    """Gets broker information"""
    # https://docs.confluent.io/current/kafka-rest/api.html#get--brokers
    resp = requests.get(f"{REST_PROXY_URL}/brokers")

    try:
        resp.raise_for_status()
    except:
        print("Failed to get brokers {json.dumps(resp.json(), indent=2)})")

    print("Fetched brokers from Kafka:")
    print(json.dumps(resp.json(), indent=2))


def get_partitions(topic_name):
    """Prints partition information for a topic"""
    # Using the above endpoints as an example, list
    # partitions for a given topic name using the API
    #
    # https://docs.confluent.io/current/kafka-rest/api.html#get--topics-(string-topic_name)-partitions
    resp = requests.get(f"{REST_PROXY_URL}/topics/{topic_name}/partitions")

    try:
        resp.raise_for_status()
    except:
        print("Failed to get partitions {json.dumps(resp.json(), indent=2)})")

    print("Fetched partitions from Kafka:")
    print(json.dumps(resp.json(), indent=2))


if __name__ == "__main__":
    topics = get_topics()
    get_topic(topics[0])
    get_brokers()
    get_partitions(topics[-1])
(venv) (etl) shravan-producers$ python rest-proxy-metadata.py
Fetched topics from Kafka:
[
  "__confluent.support.metrics",
  "_confluent-ksql-ksql_service_docker_command_topic",
  "_schemas",
  "connect-config",
  "connect-offset",
  "connect-status",
  "my-first-faust-app-__assignor-__leader",
  "my-topic",
  "sk_admin_client",
  "sk_rest_proxy",
  "sk_rest_proxy1"
]
Fetched topics from Kafka:
{
  "name": "__confluent.support.metrics",
  "configs": {
    "message.downconversion.enable": "true",
    "file.delete.delay.ms": "60000",
    "segment.ms": "604800000",
    "min.compaction.lag.ms": "0",
    "retention.bytes": "-1",
    "segment.index.bytes": "10485760",
    "cleanup.policy": "delete",
    "follower.replication.throttled.replicas": "",
    "message.timestamp.difference.max.ms": "9223372036854775807",
    "segment.jitter.ms": "0",
    "preallocate": "false",
    "message.timestamp.type": "CreateTime",
    "message.format.version": "2.2-IV1",
    "segment.bytes": "1073741824",
    "unclean.leader.election.enable": "false",
    "max.message.bytes": "1000012",
    "retention.ms": "31536000000",
    "flush.ms": "9223372036854775807",
    "delete.retention.ms": "86400000",
    "leader.replication.throttled.replicas": "",
    "min.insync.replicas": "1",
    "flush.messages": "9223372036854775807",
    "compression.type": "producer",
    "index.interval.bytes": "4096",
    "min.cleanable.dirty.ratio": "0.5"
  },
  "partitions": [
    {
      "partition": 0,
      "leader": 0,
      "replicas": [
        {
          "broker": 0,
          "leader": true,
          "in_sync": true
        }
      ]
    }
  ]
}
Fetched brokers from Kafka:
{
  "brokers": [
    0
  ]
}
Fetched partitions from Kafka:
[
  {
    "partition": 0,
    "leader": 0,
    "replicas": [
      {
        "broker": 0,
        "leader": true,
        "in_sync": true
      }
    ]
  }
]
(venv) (etl) shravan-producers$

Produce and Consume data using REST Proxy

Here will see first-hand how to produce and consume data with the Kafka REST Proxy. Though REST Proxy is conceptually similar to traditional Kafka clients, we will highlight some of the important differences and considerations.

Producing and Consuming data to REST proxy works largely like traditional Kafka consumers or producer client code. There are some differences though becuase you’re going to be working with a REST API and not a native client. We will review some special considerations for managing consumer groups, avro schemas and content types.

To produce data to a Kafka topic, you simply use the post REST verb to send data to a topic URL ending in the name of the topic. REST proxy allows you to post a list of data to a topic at once so you can send more than one message at a time. The simplest way to send data is by sending binary data with no schema whatsoever. However, sending schemaless data into Kafka is not safe. Beyond binary data, you also publish JSON and AVRO data. Shown below is an example of sending JSON data into Kafka using REST proxy. Notice the Content Type: application/vnd.kafka.json.v2+json for JSON is not just application/json. Kafka is using the vendor specific verb here so that they can add in additional information about how the REST proxy should handle the data that you’re sending it.

rest-proxy-produce

Publishing AVRO data requires some special consideration. Every message that you send through REST Proxy must include your avro schema alongside the data that it describes. Note that value_schema defines the Avro schema and that its a JSON string not raw JSON. Don’t overlook this detail when you use REST proxy.

rest-proxy-produce-avro

You might be thinking that this introduces a lot of overhead relative to a normal Kafka client. In other words, sending this value schema on every message and you’d be right. This is one of the drawbacks of using REST proxy. If your API call is successful or encounters an error, you’ll receive standard HTTP error codes from the API.

Finally, let’s review the HTTP header content types that you must send a REST proxy. These are incredibly important when using REST proxy, so pay attention to the Content Type. REST Proxy uses Content Type that you send it to make decisions about how to handle the data that’s being sent to it. So how do these headers work? The gist of it is that you supply the embedded format of the data for Kafka. The embedded format is what is bold in the below figure: either avro, json or binary. In addition to the serialization format of the actual message that we are sending to Kafka REST proxy, for now, this is always v2+json - that’s what Kafka’s REST proxy supports for now.

rest-proxy-content-types

The REST proxy uses content types for both requests and responses to indicate 3 properties of the data:

  • the serialization format (e.g. json),
  • the version of the API (e.g. v2),
  • and the embedded format (e.g. json, binary or avro).

Currently, the only serialization format supported is json and the versions of the API are v1 and v2.

READ THIS: All about Content Types

Producing JSON Data to Kafka with REST Proxy

One of the key differences between using Native Kafka clients and REST proxy is, in the produce function we make use of requests.post() to send our data to Kafka. The first thing we have to do here is set the appropriate headers, and getting the Content-Type right is the most important thing to setup before we can do anything. Since our goal is to send JSON embedded data using default serialization format, our Content-Type string will be: application/vnd.kafka.json.v2+json.

application/vnd.kafka[.embedded_format].[api_version]+[serialization_format]

Then we are going define our data. Here we are going to define a records keep. Records are a list of one or more records to post to the REST proxy. But we don’t just dump the data in here. We have to specify that it’s a value or that it’s a key. In our case, we are just passing in the ClickEvents object, so we have to specify that it is a value. Once that is done, we need to pass that data in the post. Notice that we need to convert the data dictionary into a json string using json.dumps(data). Finally, make sure you are passing in the correct URL : f"{REST_PROXY_URL}/topics/sk_rest_proxy". So, in summary, all you need is 3 things before you can start producing JSON data into Kafka:

  1. URL with topic name
  2. headers with appropriate Content-Type
  3. data along with the format in which you are going to send it.
from dataclasses import asdict, dataclass, field
import json
import time
import random

import requests
from confluent_kafka import avro, Consumer, Producer
from confluent_kafka.avro import AvroConsumer, AvroProducer, CachedSchemaRegistryClient
from faker import Faker


faker = Faker()
REST_PROXY_URL = "http://localhost:8082"


def produce():
    """Produces data using REST Proxy"""

    # Set the appropriate headers
    # See: https://docs.confluent.io/current/kafka-rest/api.html#content-types
    headers = {
        "Content-Type": "application/vnd.kafka.json.v2+json"
    }
    # Define the JSON Payload to be sent to REST Proxy
    # To create data, use `asdict(ClickEvent())`
    # See: https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name)
    data = {
        "records": [
            {"value": asdict(ClickEvent())}
        ]
    }
    # What URL should be used?
    # See: https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name)
    resp = requests.post(
        f"{REST_PROXY_URL}/topics/sk_rest_proxy", data=json.dumps(data), headers=headers  
    )

    try:
        resp.raise_for_status()
    except:
        print(f"Failed to send data to REST Proxy {json.dumps(resp.json(), indent=2)}")

    print(f"Sent data to REST Proxy {json.dumps(resp.json(), indent=2)}")


@dataclass
class ClickEvent:
    email: str = field(default_factory=faker.email)
    timestamp: str = field(default_factory=faker.iso8601)
    uri: str = field(default_factory=faker.uri)
    number: int = field(default_factory=lambda: random.randint(0, 999))


def main():
    """Runs the simulation against REST Proxy"""
    try:
        while True:
            produce()
            time.sleep(0.5)
    except KeyboardInterrupt as e:
        print("shutting down")


if __name__ == "__main__":
    main()

Output:

Produce JSON data into Kafka:

root@12b6a4439f1d:/home/workspace# python kafka-rest-proxy.py
Sent data to REST Proxy {
  "offsets": [
    {
      "partition": 0,
      "offset": 0,
      "error_code": null,
      "error": null
    }
  ],
  "key_schema_id": null,
  "value_schema_id": null
}
Sent data to REST Proxy {
  "offsets": [
    {
      "partition": 0,
      "offset": 1,
      "error_code": null,
      "error": null
    }
  ],
  "key_schema_id": null,
  "value_schema_id": null
}

Consume data from Kafka:

root@12b6a4439f1d:/home/workspace# kafka-console-consumer --bootstrap-server localhost:9092 --topic sk_rest_proxy --from-beginning
{"email":"katherineking@klein.com","timestamp":"1979-08-30T05:51:59","uri":"http://young.org/posts/search/index.htm","number":377}
{"email":"ybullock@williams-barnes.com","timestamp":"1973-02-21T14:37:38","uri":"https://russell.biz/","number":451}
{"email":"pgross@yahoo.com","timestamp":"2014-05-12T01:17:57","uri":"https://atkins.com/homepage/","number":741}
{"email":"traviserickson@yahoo.com","timestamp":"1981-04-16T16:34:53","uri":"http://morgan.com/categories/posts/tag/terms/","number":4}
{"email":"amanda62@gmail.com","timestamp":"1985-03-21T11:19:07","uri":"http://www.kirk-gibbs.com/main/posts/explore/main/","number":882}
{"email":"ianthomas@hotmail.com","timestamp":"1973-11-19T04:47:49","uri":"http://www.jackson.org/tag/search/","number":853}
{"email":"vrussell@hotmail.com","timestamp":"1995-03-20T06:33:28","uri":"https://www.wright.org/post.htm","number":50}
{"email":"pamela48@yahoo.com","timestamp":"1978-12-11T23:53:35","uri":"http://dennis.com/author/","number":685}
{"email":"davidkirby@hanson.com","timestamp":"1996-02-24T09:31:22","uri":"https://stark-griffin.com/","number":231}
{"email":"blackpaul@hotmail.com","timestamp":"1970-11-17T07:25:47","uri":"http://www.ellis.biz/category/","number":504}
{"email":"donaldwarren@lewis-daniels.com","timestamp":"2002-05-10T21:55:31","uri":"http://www.walker.com/tags/wp-content/privacy.html","number":619}
{"email":"beanalyssa@yahoo.com","timestamp":"2006-01-13T22:44:20","uri":"https://www.bender.com/category/","number":176}

Producing Avro Data to Kafka via REST Proxy

Producing Avro data is pretty much similar to producing JSON data. There is however, one key difference in parameters we pass in with the response.post() method.

from dataclasses import asdict, dataclass, field
import json
import time
import random

import requests
from confluent_kafka import avro, Consumer, Producer
from confluent_kafka.avro import AvroConsumer, AvroProducer, CachedSchemaRegistryClient
from faker import Faker


faker = Faker()
REST_PROXY_URL = "http://localhost:8082"

AVRO_SCHEMA = """{
    "type": "record",
    "name": "click_event",
    "fields": [
        {"name": "email", "type": "string"},
        {"name": "timestamp", "type": "string"},
        {"name": "uri", "type": "string"},
        {"name": "number", "type": "int"}
    ]
}"""


def produce():
    """Produces data using REST Proxy"""

    # Set the appropriate headers
    # See: https://docs.confluent.io/current/kafka-rest/api.html#content-types
    headers = {
        "Content-Type": "application/vnd.kafka.json.v2+json"
    }
    # Update the below payload to include the Avro Schema string
    # See: https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name)
    data = {
        # TODO
        "records": [{"value": asdict(ClickEvent())}]
    }
    resp = requests.post(
        f"{REST_PROXY_URL}/topics/lesson4.exercise6.click_events",  # TODO
        data=json.dumps(data),
        headers=headers,
    )

    try:
        resp.raise_for_status()
    except:
        print(f"Failed to send data to REST Proxy {json.dumps(resp.json(), indent=2)}")

    print(f"Sent data to REST Proxy {json.dumps(resp.json(), indent=2)}")


@dataclass
class ClickEvent:
    email: str = field(default_factory=faker.email)
    timestamp: str = field(default_factory=faker.iso8601)
    uri: str = field(default_factory=faker.uri)
    number: int = field(default_factory=lambda: random.randint(0, 999))


def main():
    """Runs the simulation against REST Proxy"""
    try:
        while True:
            produce()
            time.sleep(0.5)
    except KeyboardInterrupt as e:
        print("shutting down")


if __name__ == "__main__":
    main()

Output:

Produce Avro data to Kafka:

root@bcced0f1f689:/home/workspace# python exercise4.6.py
Sent data to REST Proxy {
  "offsets": [
    {
      "partition": 0,
      "offset": 0,
      "error_code": null,
      "error": null
    }
  ],
  "key_schema_id": null,
  "value_schema_id": 1
}
Sent data to REST Proxy {
  "offsets": [
    {
      "partition": 0,
      "offset": 1,
      "error_code": null,
      "error": null
    }
  ],
  "key_schema_id": null,
  "value_schema_id": 1
}

Consume data using kafka-console-consumer:

root@bcced0f1f689:/home/workspace# kafka-console-consumer --bootstrap-server localhost:9092 --topic sk_rest_proxy_avro --from-beginning
$wmueller@gmail.com&1972-06-25T17:55:58Lhttp://miller-arellano.com/posts/post/,


^CProcessed a total of 33 messages
root@bcced0f1f689:/home/workspace#

Notice here that the output is gibberish, since kafka-console-consumer knows how to deserialize the data but is not aware of the avro format.

Consuming data with REST Proxy

Consuming data with REST Proxy requires a little more effort than the producer endpoints. To consume data from REST proxy, you must first post to create a named consumer group. The name must be unique. In this post to create the consumer group, you’ll specify a format of either binary, json or Avro. The format you specify will determine how REST Proxy formats the data which is retruned to you when you begin to consume later on. Additionally, the post will return to you a 200 response if successful, an instance_id of your consumer, and a base_uri. You should be using this base_uri to format your future requests for consumer information.

Using the base_uri, you now need to post another request to the subscribe endpoint. This subscribes the consumer group you just created to one or more topics.

As we will see, using stream processing frameworks has more use-cases than consuming data with REST proxy. I will update this section once I finish the stream processing frameworks.

TODO: Update this

Summary

In this post we learned that Kafka REST Proxy:

  • Is a web server built in Java and Scala that allows any client capable of HTTP to integrate with Kafka
  • Allows production and consumption of Kafka data
  • Allows read-only operations on administrative information and metadata