Managing Kafka Topics

11 minute read

Managing Topics in Kafka

Kafka offers a number of configuration options for topics. By default, we can can create topics automatically, but it is not a good practice to get into. To get the best performance out of your Kafka cluster and it’s topics, always make sure to manually create your topics with carefully considered options. One easy thing to consider in your producer applications is to write code that checks to see if the desired topic exists. If it doesn’t exist, then the producer can configure and create the topic. Alternatively, you can also consider using Bash scripts or infrastructure provisioning tools like terraform to create your own topics as well.

Goal: In this post, I will cover the ways in which we can create, delete, modify topics in Kafka using 3 methods:

  1. Using AdminClient provided by confluent_kafka
  2. Using kafka-topics and kafka-configs cli tools
  3. Using Kafka REST Proxy to get cluster metadata.

Each has its own merits which I will briefly discuss. I often use a combination of these while I am building my applications or troubleshooting an issue. This post is meant to give an overview of each method, I have provided some links to learn more about things I haven’t covered here.

Method 1: Use confluent_kafka — Confluent’s Python client for Apache Kafka

Check if a topic already exists:

All we need here is the AdminClient which takes the Kafka broker url.

Kafka Admin client: create, view, alter, delete topics and resources.

from confluent_kafka.admin import AdminClient

BROKER_URL = "PLAINTEXT://localhost:9092"

def topic_exists(client, topic_name):
    """Checks if the given topic exists"""
    topic_metadata = client.list_topics(timeout=5)
    for t in topic_metadata.topics.values():
        if t.topic == topic_name:
            return True
    return False

def list_all_topics(client):
    """Lists all the available topics"""
    topic_metadata = client.list_topics(timeout=5)
    for t in topic_metadata.topics.values():
        print(t.topic)

def main():
    """Checks for topic and creates the topic if it does not exist"""
    client = AdminClient({"bootstrap.servers": BROKER_URL})

    # list all available topics
    list_all_topics(client)

    # check if a given topic already exists
    topic_name = "sk_rest_proxy"
    if topic_exists(client, topic_name):
        print(f"{topic_name} already exists")
    else:
        print(f"{topic_name} doesn't exist")


if __name__ == "__main__":
    main()

(venv) (etl) shravan-producers$ python list_topics.py
_schemas
connect-config
sk_rest_proxy1
_confluent-ksql-ksql_service_docker_command_topic
connect-status
my-first-faust-app-__assignor-__leader
connect-offset
sk_rest_proxy
__consumer_offsets
__confluent.support.metrics

sk_rest_proxy already exists
(venv) (etl) shravan-producers$

Create and configure a new topic:

The main things to provide when creating a new topic are:

  • topic_name
  • num_partitions
  • replication_factor

In addition to this, we can set several topic data management settings when creating the topic itself. The following are the some key settings to consider when creating your topics.

Topic Data Management key points:

Using AdminClient:

from confluent_kafka.admin import AdminClient, NewTopic

BROKER_URL = "PLAINTEXT://localhost:9092"

def topic_exists(client, topic_name):
    """Checks if the given topic exists"""
    topic_metadata = client.list_topics(timeout=5)
    return topic_name in set(t.topic for t in iter(topic_metadata.topics.values()))

def create_topic(client, topic_name):
    """Creates the topic with the given topic name"""
    futures = client.create_topics(
        [
            NewTopic(
                topic=topic_name,
                num_partitions=10,
                replication_factor=1,
                config={
                    "cleanup.policy": "delete",
                    "compression.type": "lz4",
                    "delete.retention.ms": "2000",
                    "file.delete.delay.ms": "2000",
                },
            )
        ]
    )

    for topic, future in futures.items():
        try:
            future.result()
            print("topic created")
        except Exception as e:
            print(f"failed to create topic {topic_name}: {e}")


def main():
    """Checks for topic and creates the topic if it does not exist"""
    client = AdminClient({"bootstrap.servers": BROKER_URL})

    topic_name = "sk_admin_client"
    exists = topic_exists(client, topic_name)
    print(f"Topic {topic_name} exists: {exists}")

    if exists is False:
        create_topic(client, topic_name)
    else:
        print("Topic already exists")

if __name__ == "__main__":
    main()

Output:

(venv) (etl) shravan-producers$ python create_topics.py
Topic sk_admin_client exists: False
topic created

Verify if the topic is indeed created:

(venv) (etl) shravan-producers$ python list_topics.py
sk_admin_client
_schemas
connect-config
sk_rest_proxy1
_confluent-ksql-ksql_service_docker_command_topic
connect-status
my-first-faust-app-__assignor-__leader
connect-offset
sk_rest_proxy
__consumer_offsets
my-topic

Method 2: Using Kafka CLI tools

Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more --config options.

This example creates a topic named my-topic with a custom max message size and flush rate:

Create topics using kafka-topics cli

root@e77c276dd7af:/# kafka-topics --bootstrap-server kafka0:9092 --create --topic my-topic --partitions 1 \
> --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1


root@e77c276dd7af:/# kafka-topics --list --zookeeper zookeeper:2181
__confluent.support.metrics
__consumer_offsets
_confluent-ksql-ksql_service_docker_command_topic
_schemas
connect-config
connect-offset
connect-status
my-first-faust-app-__assignor-__leader
my-topic
sk_rest_proxy
sk_rest_proxy1
root@e77c276dd7af:/#

Check topic configurations using kafka-configs cli

For the above topic here are the configurations:

root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name my-topic --describe
Configs for topic 'my-topic' are max.message.bytes=64000,flush.messages=1

root@e77c276dd7af:/#

Another example:

root@e3ca30ea1358:/home/workspace# kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name sk_admin_client --describe

Configs for topic 'sk_admin_client' are delete.retention.ms=2000,compression.type=lz4,cleanup.policy=delete,file.delete.delay.ms=2000

root@e3ca30ea1358:/home/workspace#

Modify topic configuration using kafka-configs cli

Overrides can also be changed or set later using the alter configs command. This example updates the max message size max.message.bytes from 64000 to 128000.

root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name my-topic --describe
Configs for topic 'my-topic' are max.message.bytes=64000,flush.messages=1

root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name my-topic \
> --alter --add-config max.message.bytes=128000
Completed Updating config for entity: topic 'my-topic'.

root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name my-topic --describe
Configs for topic 'my-topic' are max.message.bytes=128000,flush.messages=1

root@e77c276dd7af:/#

Delete all messages from a Kafka topic

One way to delete all the messages from Kafka topic is to change the retention.ms=1000 (to 1 second). This will remove all the message from the topic that are older than 1 second. Then, revert the configuration back to its default value.

kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name com.kafka.producers.purchases --alter --add-config retention.ms=1000

root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name com.kafka.producers.purchases --describe
Configs for topic 'com.kafka.producers.purchases' are retention.ms=1000

root@e77c276dd7af:/# kafka-console-consumer --bootstrap-server kafka0:9092 --topic com.kafka.producers.purchases --from-beginning
^CProcessed a total of 0 message

To remove an override you can do:

root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics  --entity-name com.kafka.producers.purchases --describe
Configs for topic 'com.kafka.producers.purchases' are retention.ms=1000

root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics  --entity-name com.kafka.producers.purchases --alter --delete-config retention.ms
Completed Updating config for entity: topic 'com.kafka.producers.purchases'.

root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics  --entity-name com.kafka.producers.purchases --describe
Configs for topic 'com.kafka.producers.purchases' are

Method 3: Use Kafka REST Proxy

Fetching Cluster Metadata from REST Proxy: With Kafka REST Proxy, you cannot create topics, but you can retrieve topic metadata.

I find this useful, when I want to check the default values for my topics/brokers/partitions. It is not so immediately obvious to obtain the defaults using kafka provided cli tools.

Useful URLs:

import json
import requests

REST_PROXY_URL = "http://localhost:8082"

def get_topics():
    """Gets topics from REST Proxy"""
    # https://docs.confluent.io/current/kafka-rest/api.html#get--topics
    resp = requests.get(f"{REST_PROXY_URL}/topics")  # TODO

    try:
        resp.raise_for_status()
    except:
        print("Failed to get topics {json.dumps(resp.json(), indent=2)})")
        return []

    print("Fetched topics from Kafka:")
    print(json.dumps(resp.json(), indent=2))
    return resp.json()


def get_topic(topic_name):
    """Get specific details on a topic"""
    # https://docs.confluent.io/current/kafka-rest/api.html#get--topics
    resp = requests.get(f"{REST_PROXY_URL}/topics/{topic_name}")  # TODO

    try:
        resp.raise_for_status()
    except:
        print("Failed to get topics {json.dumps(resp.json(), indent=2)})")

    print("Fetched topics from Kafka:")
    print(json.dumps(resp.json(), indent=2))


def get_brokers():
    """Gets broker information"""
    # https://docs.confluent.io/current/kafka-rest/api.html#get--brokers
    resp = requests.get(f"{REST_PROXY_URL}/brokers")  # TODO

    try:
        resp.raise_for_status()
    except:
        print("Failed to get brokers {json.dumps(resp.json(), indent=2)})")

    print("Fetched brokers from Kafka:")
    print(json.dumps(resp.json(), indent=2))


def get_partitions(topic_name):
    """Prints partition information for a topic"""
    # Using the above endpoints as an example, list
    # partitions for a given topic name using the API
    #
    # https://docs.confluent.io/current/kafka-rest/api.html#get--topics-(string-topic_name)-partitions
    resp = requests.get(f"{REST_PROXY_URL}/topics/{topic_name}/partitions")  # TODO

    try:
        resp.raise_for_status()
    except:
        print("Failed to get partitions {json.dumps(resp.json(), indent=2)})")

    print("Fetched partitions from Kafka:")
    print(json.dumps(resp.json(), indent=2))


if __name__ == "__main__":
    topics = get_topics()
    get_topic(topics[0])
    get_brokers()
    get_partitions(topics[-1])
(venv) (etl) shravan-producers$ python rest-proxy-metadata.py
Fetched topics from Kafka:
[
  "__confluent.support.metrics",
  "_confluent-ksql-ksql_service_docker_command_topic",
  "_schemas",
  "connect-config",
  "connect-offset",
  "connect-status",
  "my-first-faust-app-__assignor-__leader",
  "my-topic",
  "sk_admin_client",
  "sk_rest_proxy",
  "sk_rest_proxy1"
]
Fetched topics from Kafka:
{
  "name": "__confluent.support.metrics",
  "configs": {
    "message.downconversion.enable": "true",
    "file.delete.delay.ms": "60000",
    "segment.ms": "604800000",
    "min.compaction.lag.ms": "0",
    "retention.bytes": "-1",
    "segment.index.bytes": "10485760",
    "cleanup.policy": "delete",
    "follower.replication.throttled.replicas": "",
    "message.timestamp.difference.max.ms": "9223372036854775807",
    "segment.jitter.ms": "0",
    "preallocate": "false",
    "message.timestamp.type": "CreateTime",
    "message.format.version": "2.2-IV1",
    "segment.bytes": "1073741824",
    "unclean.leader.election.enable": "false",
    "max.message.bytes": "1000012",
    "retention.ms": "31536000000",
    "flush.ms": "9223372036854775807",
    "delete.retention.ms": "86400000",
    "leader.replication.throttled.replicas": "",
    "min.insync.replicas": "1",
    "flush.messages": "9223372036854775807",
    "compression.type": "producer",
    "index.interval.bytes": "4096",
    "min.cleanable.dirty.ratio": "0.5"
  },
  "partitions": [
    {
      "partition": 0,
      "leader": 0,
      "replicas": [
        {
          "broker": 0,
          "leader": true,
          "in_sync": true
        }
      ]
    }
  ]
}
Fetched brokers from Kafka:
{
  "brokers": [
    0
  ]
}
Fetched partitions from Kafka:
[
  {
    "partition": 0,
    "leader": 0,
    "replicas": [
      {
        "broker": 0,
        "leader": true,
        "in_sync": true
      }
    ]
  }
]
(venv) (etl) shravan-producers$

Appendix: commonly used cli

kafka-topics all cli

root@e77c276dd7af:/# kafka-topics --help
This tool helps to create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions,
                                           replica assignment, and/or
                                           configuration for the topic.
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect
  connect to>                              to. In case of providing this, a
                                           direct Zookeeper connection won't be
                                           required.
--command-config <String: command        Property file containing configs to be
  config property file>                    passed to Admin Client. This is used
                                           only with --bootstrap-server option
                                           for describing and altering broker
                                           configs.
--config <String: name=value>            A topic configuration override for the
                                           topic being created or altered.The
                                           following is a list of valid
                                           configurations:
                                         	cleanup.policy
                                         	compression.type
                                         	delete.retention.ms
                                         	file.delete.delay.ms
                                         	flush.messages
                                         	flush.ms
                                         	follower.replication.throttled.
                                           replicas
                                         	index.interval.bytes
                                         	leader.replication.throttled.replicas
                                         	max.message.bytes
                                         	message.downconversion.enable
                                         	message.format.version
                                         	message.timestamp.difference.max.ms
                                         	message.timestamp.type
                                         	min.cleanable.dirty.ratio
                                         	min.compaction.lag.ms
                                         	min.insync.replicas
                                         	preallocate
                                         	retention.bytes
                                         	retention.ms
                                         	segment.bytes
                                         	segment.index.bytes
                                         	segment.jitter.ms
                                         	segment.ms
                                         	unclean.leader.election.enable
                                         See the Kafka documentation for full
                                           details on the topic configs.It is
                                           supported only in combination with --
                                           create if --bootstrap-server option
                                           is used.
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be
                                           removed for an existing topic (see
                                           the list of configurations under the
                                           --config option). Not supported with
                                           the --bootstrap-server option.
--describe                               List details for the given topics.
--disable-rack-aware                     Disable rack aware replica assignment
--exclude-internal                       exclude internal topics when running
                                           list or describe command. The
                                           internal topics will be listed by
                                           default
--force                                  Suppress console prompts
--help                                   Print usage information.
--if-exists                              if set when altering or deleting or
                                           describing topics, the action will
                                           only execute if the topic exists.
                                           Not supported with the --bootstrap-
                                           server option.
--if-not-exists                          if set when creating topics, the
                                           action will only execute if the
                                           topic does not already exist. Not
                                           supported with the --bootstrap-
                                           server option.
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic
                                           being created or altered (WARNING:
                                           If partitions are increased for a
                                           topic that has a key, the partition
                                           logic or ordering of the messages
                                           will be affected
--replica-assignment <String:            A list of manual partition-to-broker
  broker_id_for_part1_replica1 :           assignments for the topic being
  broker_id_for_part1_replica2 ,           created or altered.
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each
  replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to create, alter, describe
                                           or delete. It also accepts a regular
                                           expression, except for --create
                                           option. Put topic name in double
                                           quotes and use the '\' prefix to
                                           escape regular expression symbols; e.
                                           g. "test\.topic".
--topics-with-overrides                  if set when describing topics, only
                                           show topics that have overridden
                                           configs
--unavailable-partitions                 if set when describing topics, only
                                           show partitions whose leader is not
                                           available
--under-replicated-partitions            if set when describing topics, only
                                           show under replicated partitions
--zookeeper <String: hosts>              DEPRECATED, The connection string for
                                           the zookeeper connection in the form
                                           host:port. Multiple hosts can be
                                           given to allow fail-over.
root@e77c276dd7af:/#

kafka-configs cli

root@e77c276dd7af:/# kafka-configs --help
This tool helps to manipulate and describe entity config for a topic, client, user or broker
Option                                 Description
------                                 -----------
--add-config <String>                  Key Value pairs of configs to add.
                                         Square brackets can be used to group
                                         values which contain commas: 'k1=v1,
                                         k2=[v1,v2,v2],k3=v3'. The following
                                         is a list of valid configurations:
                                         For entity-type 'topics':
                                       	cleanup.policy
                                       	compression.type
                                       	delete.retention.ms
                                       	file.delete.delay.ms
                                       	flush.messages
                                       	flush.ms
                                       	follower.replication.throttled.
                                         replicas
                                       	index.interval.bytes
                                       	leader.replication.throttled.replicas
                                       	max.message.bytes
                                       	message.downconversion.enable
                                       	message.format.version
                                       	message.timestamp.difference.max.ms
                                       	message.timestamp.type
                                       	min.cleanable.dirty.ratio
                                       	min.compaction.lag.ms
                                       	min.insync.replicas
                                       	preallocate
                                       	retention.bytes
                                       	retention.ms
                                       	segment.bytes
                                       	segment.index.bytes
                                       	segment.jitter.ms
                                       	segment.ms
                                       	unclean.leader.election.enable
                                       For entity-type 'brokers':
                                       	log.message.timestamp.type
                                       	ssl.client.auth
                                       	log.retention.ms
                                       	sasl.login.refresh.window.jitter
                                       	sasl.kerberos.ticket.renew.window.
                                         factor
                                       	log.preallocate
                                       	log.index.size.max.bytes
                                       	sasl.login.refresh.window.factor
                                       	ssl.truststore.type
                                       	ssl.keymanager.algorithm
                                       	log.cleaner.io.buffer.load.factor
                                       	sasl.login.refresh.min.period.seconds
                                       	ssl.key.password
                                       	background.threads
                                       	log.retention.bytes
                                       	ssl.trustmanager.algorithm
                                       	log.segment.bytes
                                       	max.connections.per.ip.overrides
                                       	log.cleaner.delete.retention.ms
                                       	log.segment.delete.delay.ms
                                       	min.insync.replicas
                                       	ssl.keystore.location
                                       	ssl.cipher.suites
                                       	log.roll.jitter.ms
                                       	log.cleaner.backoff.ms
                                       	sasl.jaas.config
                                       	principal.builder.class
                                       	log.flush.interval.ms
                                       	log.cleaner.dedupe.buffer.size
                                       	log.flush.interval.messages
                                       	advertised.listeners
                                       	num.io.threads
                                       	listener.security.protocol.map
                                       	log.message.downconversion.enable
                                       	sasl.enabled.mechanisms
                                       	sasl.login.refresh.buffer.seconds
                                       	ssl.truststore.password
                                       	listeners
                                       	metric.reporters
                                       	ssl.protocol
                                       	sasl.kerberos.ticket.renew.jitter
                                       	ssl.keystore.password
                                       	sasl.mechanism.inter.broker.protocol
                                       	log.cleanup.policy
                                       	sasl.kerberos.principal.to.local.rules
                                       	sasl.kerberos.min.time.before.relogin
                                       	num.recovery.threads.per.data.dir
                                       	log.cleaner.io.max.bytes.per.second
                                       	log.roll.ms
                                       	ssl.endpoint.identification.algorithm
                                       	unclean.leader.election.enable
                                       	message.max.bytes
                                       	log.cleaner.threads
                                       	log.cleaner.io.buffer.size
                                       	max.connections.per.ip
                                       	sasl.kerberos.service.name
                                       	ssl.provider
                                       	follower.replication.throttled.rate
                                       	log.index.interval.bytes
                                       	log.cleaner.min.compaction.lag.ms
                                       	log.message.timestamp.difference.max.
                                         ms
                                       	ssl.enabled.protocols
                                       	log.cleaner.min.cleanable.ratio
                                       	replica.alter.log.dirs.io.max.bytes.
                                         per.second
                                       	ssl.keystore.type
                                       	ssl.secure.random.implementation
                                       	ssl.truststore.location
                                       	sasl.kerberos.kinit.cmd
                                       	leader.replication.throttled.rate
                                       	num.network.threads
                                       	compression.type
                                       	num.replica.fetchers
                                       For entity-type 'users':
                                       	request_percentage
                                       	producer_byte_rate
                                       	SCRAM-SHA-256
                                       	SCRAM-SHA-512
                                       	consumer_byte_rate
                                       For entity-type 'clients':
                                       	request_percentage
                                       	producer_byte_rate
                                       	consumer_byte_rate
                                       Entity types 'users' and 'clients' may
                                         be specified together to update
                                         config for clients of a specific
                                         user.
--alter                                Alter the configuration for the entity.
--bootstrap-server <String: server to  The Kafka server to connect to. This
  connect to>                            is required for describing and
                                         altering broker configs.
--command-config <String: command      Property file containing configs to be
  config property file>                  passed to Admin Client. This is used
                                         only with --bootstrap-server option
                                         for describing and altering broker
                                         configs.
--delete-config <String>               config keys to remove 'k1,k2'
--describe                             List configs for the given entity.
--entity-default                       Default entity name for
                                         clients/users/brokers (applies to
                                         corresponding entity type in command
                                         line)
--entity-name <String>                 Name of entity (topic name/client
                                         id/user principal name/broker id)
--entity-type <String>                 Type of entity
                                         (topics/clients/users/brokers)
--force                                Suppress console prompts
--help                                 Print usage information.
--zookeeper <String: urls>             REQUIRED: The connection string for
                                         the zookeeper connection in the form
                                         host:port. Multiple URLS can be
                                         given to allow fail-over.
root@e77c276dd7af:/#