Managing Kafka Topics
Managing Topics in Kafka
Kafka offers a number of configuration options for topics. By default, we can can create topics automatically, but it is not a good practice to get into. To get the best performance out of your Kafka cluster and it’s topics, always make sure to manually create your topics with carefully considered options. One easy thing to consider in your producer applications is to write code that checks to see if the desired topic exists. If it doesn’t exist, then the producer can configure and create the topic. Alternatively, you can also consider using Bash scripts or infrastructure provisioning tools like terraform to create your own topics as well.
Goal: In this post, I will cover the ways in which we can create, delete, modify topics in Kafka using 3 methods:
- Using
AdminClient
provided byconfluent_kafka
- Using
kafka-topics
andkafka-configs
cli tools - Using
Kafka REST Proxy
to get cluster metadata.
Each has its own merits which I will briefly discuss. I often use a combination of these while I am building my applications or troubleshooting an issue. This post is meant to give an overview of each method, I have provided some links to learn more about things I haven’t covered here.
Method 1: Use confluent_kafka
— Confluent’s Python client for Apache Kafka
Check if a topic already exists:
All we need here is the AdminClient
which takes the Kafka broker url.
Kafka Admin client: create, view, alter, delete topics and resources.
from confluent_kafka.admin import AdminClient
BROKER_URL = "PLAINTEXT://localhost:9092"
def topic_exists(client, topic_name):
"""Checks if the given topic exists"""
topic_metadata = client.list_topics(timeout=5)
for t in topic_metadata.topics.values():
if t.topic == topic_name:
return True
return False
def list_all_topics(client):
"""Lists all the available topics"""
topic_metadata = client.list_topics(timeout=5)
for t in topic_metadata.topics.values():
print(t.topic)
def main():
"""Checks for topic and creates the topic if it does not exist"""
client = AdminClient({"bootstrap.servers": BROKER_URL})
# list all available topics
list_all_topics(client)
# check if a given topic already exists
topic_name = "sk_rest_proxy"
if topic_exists(client, topic_name):
print(f"{topic_name} already exists")
else:
print(f"{topic_name} doesn't exist")
if __name__ == "__main__":
main()
(venv) (etl) shravan-producers$ python list_topics.py
_schemas
connect-config
sk_rest_proxy1
_confluent-ksql-ksql_service_docker_command_topic
connect-status
my-first-faust-app-__assignor-__leader
connect-offset
sk_rest_proxy
__consumer_offsets
__confluent.support.metrics
sk_rest_proxy already exists
(venv) (etl) shravan-producers$
Create and configure a new topic:
The main things to provide when creating a new topic are:
topic_name
num_partitions
replication_factor
In addition to this, we can set several topic data management settings when creating the topic itself. The following are the some key settings to consider when creating your topics.
Topic Data Management key points:
- Data retention determines how long Kafka stores data in a topic.
- When data expires it is deleted from the topic.
- Retention policies may be time based. Once data reaches a certain age it is deleted.
- Retention policies may be size based. Once a topic reaches a certain age the oldest data is deleted.
- Retention policies may be both time- and size-based. Once either condition is reached, the oldest data is deleted.
- Alternatively, topics can be compacted in which there is no size or time limit for data in the topic.
- Compacted topics use the message key to identify messages uniquely. If a duplicate key is found, the latest value for that key is kept, and the old message is deleted.
- Kafka topics can use compression algorithms to store data. This can reduce network overhead and save space on brokers. Supported compression algorithms include: lz4, ztsd, snappy, and gzip.
- Kafka topics should store data for one type of event, not multiple types of events. Keeping multiple event types in one topic will cause your topic to be hard to use for downstream consumers.
Using AdminClient:
from confluent_kafka.admin import AdminClient, NewTopic
BROKER_URL = "PLAINTEXT://localhost:9092"
def topic_exists(client, topic_name):
"""Checks if the given topic exists"""
topic_metadata = client.list_topics(timeout=5)
return topic_name in set(t.topic for t in iter(topic_metadata.topics.values()))
def create_topic(client, topic_name):
"""Creates the topic with the given topic name"""
futures = client.create_topics(
[
NewTopic(
topic=topic_name,
num_partitions=10,
replication_factor=1,
config={
"cleanup.policy": "delete",
"compression.type": "lz4",
"delete.retention.ms": "2000",
"file.delete.delay.ms": "2000",
},
)
]
)
for topic, future in futures.items():
try:
future.result()
print("topic created")
except Exception as e:
print(f"failed to create topic {topic_name}: {e}")
def main():
"""Checks for topic and creates the topic if it does not exist"""
client = AdminClient({"bootstrap.servers": BROKER_URL})
topic_name = "sk_admin_client"
exists = topic_exists(client, topic_name)
print(f"Topic {topic_name} exists: {exists}")
if exists is False:
create_topic(client, topic_name)
else:
print("Topic already exists")
if __name__ == "__main__":
main()
Output:
(venv) (etl) shravan-producers$ python create_topics.py
Topic sk_admin_client exists: False
topic created
Verify if the topic is indeed created:
(venv) (etl) shravan-producers$ python list_topics.py
sk_admin_client
_schemas
connect-config
sk_rest_proxy1
_confluent-ksql-ksql_service_docker_command_topic
connect-status
my-first-faust-app-__assignor-__leader
connect-offset
sk_rest_proxy
__consumer_offsets
my-topic
Method 2: Using Kafka CLI tools
Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more --config
options.
This example creates a topic named my-topic with a custom max message size and flush rate:
Create topics using kafka-topics
cli
root@e77c276dd7af:/# kafka-topics --bootstrap-server kafka0:9092 --create --topic my-topic --partitions 1 \
> --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
root@e77c276dd7af:/# kafka-topics --list --zookeeper zookeeper:2181
__confluent.support.metrics
__consumer_offsets
_confluent-ksql-ksql_service_docker_command_topic
_schemas
connect-config
connect-offset
connect-status
my-first-faust-app-__assignor-__leader
my-topic
sk_rest_proxy
sk_rest_proxy1
root@e77c276dd7af:/#
Check topic configurations using kafka-configs
cli
For the above topic here are the configurations:
root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name my-topic --describe
Configs for topic 'my-topic' are max.message.bytes=64000,flush.messages=1
root@e77c276dd7af:/#
Another example:
root@e3ca30ea1358:/home/workspace# kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name sk_admin_client --describe
Configs for topic 'sk_admin_client' are delete.retention.ms=2000,compression.type=lz4,cleanup.policy=delete,file.delete.delay.ms=2000
root@e3ca30ea1358:/home/workspace#
Modify topic configuration using kafka-configs
cli
Overrides can also be changed or set later using the alter configs command. This example updates the max message size max.message.bytes
from 64000 to 128000.
root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name my-topic --describe
Configs for topic 'my-topic' are max.message.bytes=64000,flush.messages=1
root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name my-topic \
> --alter --add-config max.message.bytes=128000
Completed Updating config for entity: topic 'my-topic'.
root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name my-topic --describe
Configs for topic 'my-topic' are max.message.bytes=128000,flush.messages=1
root@e77c276dd7af:/#
Delete all messages from a Kafka topic
One way to delete all the messages from Kafka topic is to change the retention.ms=1000
(to 1 second). This will remove all the message from the topic that are older than 1 second. Then, revert the configuration back to its default value.
kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name com.kafka.producers.purchases --alter --add-config retention.ms=1000
root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name com.kafka.producers.purchases --describe
Configs for topic 'com.kafka.producers.purchases' are retention.ms=1000
root@e77c276dd7af:/# kafka-console-consumer --bootstrap-server kafka0:9092 --topic com.kafka.producers.purchases --from-beginning
^CProcessed a total of 0 message
To remove an override you can do:
root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name com.kafka.producers.purchases --describe
Configs for topic 'com.kafka.producers.purchases' are retention.ms=1000
root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name com.kafka.producers.purchases --alter --delete-config retention.ms
Completed Updating config for entity: topic 'com.kafka.producers.purchases'.
root@e77c276dd7af:/# kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name com.kafka.producers.purchases --describe
Configs for topic 'com.kafka.producers.purchases' are
Method 3: Use Kafka REST Proxy
Fetching Cluster Metadata from REST Proxy: With Kafka REST Proxy, you cannot create topics, but you can retrieve topic metadata.
I find this useful, when I want to check the default values for my topics/brokers/partitions. It is not so immediately obvious to obtain the defaults using kafka provided cli tools.
Useful URLs:
import json
import requests
REST_PROXY_URL = "http://localhost:8082"
def get_topics():
"""Gets topics from REST Proxy"""
# https://docs.confluent.io/current/kafka-rest/api.html#get--topics
resp = requests.get(f"{REST_PROXY_URL}/topics") # TODO
try:
resp.raise_for_status()
except:
print("Failed to get topics {json.dumps(resp.json(), indent=2)})")
return []
print("Fetched topics from Kafka:")
print(json.dumps(resp.json(), indent=2))
return resp.json()
def get_topic(topic_name):
"""Get specific details on a topic"""
# https://docs.confluent.io/current/kafka-rest/api.html#get--topics
resp = requests.get(f"{REST_PROXY_URL}/topics/{topic_name}") # TODO
try:
resp.raise_for_status()
except:
print("Failed to get topics {json.dumps(resp.json(), indent=2)})")
print("Fetched topics from Kafka:")
print(json.dumps(resp.json(), indent=2))
def get_brokers():
"""Gets broker information"""
# https://docs.confluent.io/current/kafka-rest/api.html#get--brokers
resp = requests.get(f"{REST_PROXY_URL}/brokers") # TODO
try:
resp.raise_for_status()
except:
print("Failed to get brokers {json.dumps(resp.json(), indent=2)})")
print("Fetched brokers from Kafka:")
print(json.dumps(resp.json(), indent=2))
def get_partitions(topic_name):
"""Prints partition information for a topic"""
# Using the above endpoints as an example, list
# partitions for a given topic name using the API
#
# https://docs.confluent.io/current/kafka-rest/api.html#get--topics-(string-topic_name)-partitions
resp = requests.get(f"{REST_PROXY_URL}/topics/{topic_name}/partitions") # TODO
try:
resp.raise_for_status()
except:
print("Failed to get partitions {json.dumps(resp.json(), indent=2)})")
print("Fetched partitions from Kafka:")
print(json.dumps(resp.json(), indent=2))
if __name__ == "__main__":
topics = get_topics()
get_topic(topics[0])
get_brokers()
get_partitions(topics[-1])
(venv) (etl) shravan-producers$ python rest-proxy-metadata.py
Fetched topics from Kafka:
[
"__confluent.support.metrics",
"_confluent-ksql-ksql_service_docker_command_topic",
"_schemas",
"connect-config",
"connect-offset",
"connect-status",
"my-first-faust-app-__assignor-__leader",
"my-topic",
"sk_admin_client",
"sk_rest_proxy",
"sk_rest_proxy1"
]
Fetched topics from Kafka:
{
"name": "__confluent.support.metrics",
"configs": {
"message.downconversion.enable": "true",
"file.delete.delay.ms": "60000",
"segment.ms": "604800000",
"min.compaction.lag.ms": "0",
"retention.bytes": "-1",
"segment.index.bytes": "10485760",
"cleanup.policy": "delete",
"follower.replication.throttled.replicas": "",
"message.timestamp.difference.max.ms": "9223372036854775807",
"segment.jitter.ms": "0",
"preallocate": "false",
"message.timestamp.type": "CreateTime",
"message.format.version": "2.2-IV1",
"segment.bytes": "1073741824",
"unclean.leader.election.enable": "false",
"max.message.bytes": "1000012",
"retention.ms": "31536000000",
"flush.ms": "9223372036854775807",
"delete.retention.ms": "86400000",
"leader.replication.throttled.replicas": "",
"min.insync.replicas": "1",
"flush.messages": "9223372036854775807",
"compression.type": "producer",
"index.interval.bytes": "4096",
"min.cleanable.dirty.ratio": "0.5"
},
"partitions": [
{
"partition": 0,
"leader": 0,
"replicas": [
{
"broker": 0,
"leader": true,
"in_sync": true
}
]
}
]
}
Fetched brokers from Kafka:
{
"brokers": [
0
]
}
Fetched partitions from Kafka:
[
{
"partition": 0,
"leader": 0,
"replicas": [
{
"broker": 0,
"leader": true,
"in_sync": true
}
]
}
]
(venv) (etl) shravan-producers$
Appendix: commonly used cli
kafka-topics
all cli
root@e77c276dd7af:/# kafka-topics --help
This tool helps to create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to. In case of providing this, a
direct Zookeeper connection won't be
required.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--config <String: name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs.It is
supported only in combination with --
create if --bootstrap-server option
is used.
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option). Not supported with
the --bootstrap-server option.
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--exclude-internal exclude internal topics when running
list or describe command. The
internal topics will be listed by
default
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting or
describing topics, the action will
only execute if the topic exists.
Not supported with the --bootstrap-
server option.
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist. Not
supported with the --bootstrap-
server option.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <String: topic> The topic to create, alter, describe
or delete. It also accepts a regular
expression, except for --create
option. Put topic name in double
quotes and use the '\' prefix to
escape regular expression symbols; e.
g. "test\.topic".
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <String: hosts> DEPRECATED, The connection string for
the zookeeper connection in the form
host:port. Multiple hosts can be
given to allow fail-over.
root@e77c276dd7af:/#
kafka-configs
cli
root@e77c276dd7af:/# kafka-configs --help
This tool helps to manipulate and describe entity config for a topic, client, user or broker
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
log.message.timestamp.type
ssl.client.auth
log.retention.ms
sasl.login.refresh.window.jitter
sasl.kerberos.ticket.renew.window.
factor
log.preallocate
log.index.size.max.bytes
sasl.login.refresh.window.factor
ssl.truststore.type
ssl.keymanager.algorithm
log.cleaner.io.buffer.load.factor
sasl.login.refresh.min.period.seconds
ssl.key.password
background.threads
log.retention.bytes
ssl.trustmanager.algorithm
log.segment.bytes
max.connections.per.ip.overrides
log.cleaner.delete.retention.ms
log.segment.delete.delay.ms
min.insync.replicas
ssl.keystore.location
ssl.cipher.suites
log.roll.jitter.ms
log.cleaner.backoff.ms
sasl.jaas.config
principal.builder.class
log.flush.interval.ms
log.cleaner.dedupe.buffer.size
log.flush.interval.messages
advertised.listeners
num.io.threads
listener.security.protocol.map
log.message.downconversion.enable
sasl.enabled.mechanisms
sasl.login.refresh.buffer.seconds
ssl.truststore.password
listeners
metric.reporters
ssl.protocol
sasl.kerberos.ticket.renew.jitter
ssl.keystore.password
sasl.mechanism.inter.broker.protocol
log.cleanup.policy
sasl.kerberos.principal.to.local.rules
sasl.kerberos.min.time.before.relogin
num.recovery.threads.per.data.dir
log.cleaner.io.max.bytes.per.second
log.roll.ms
ssl.endpoint.identification.algorithm
unclean.leader.election.enable
message.max.bytes
log.cleaner.threads
log.cleaner.io.buffer.size
max.connections.per.ip
sasl.kerberos.service.name
ssl.provider
follower.replication.throttled.rate
log.index.interval.bytes
log.cleaner.min.compaction.lag.ms
log.message.timestamp.difference.max.
ms
ssl.enabled.protocols
log.cleaner.min.cleanable.ratio
replica.alter.log.dirs.io.max.bytes.
per.second
ssl.keystore.type
ssl.secure.random.implementation
ssl.truststore.location
sasl.kerberos.kinit.cmd
leader.replication.throttled.rate
num.network.threads
compression.type
num.replica.fetchers
For entity-type 'users':
request_percentage
producer_byte_rate
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
For entity-type 'clients':
request_percentage
producer_byte_rate
consumer_byte_rate
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--alter Alter the configuration for the entity.
--bootstrap-server <String: server to The Kafka server to connect to. This
connect to> is required for describing and
altering broker configs.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers (applies to
corresponding entity type in command
line)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker id)
--entity-type <String> Type of entity
(topics/clients/users/brokers)
--force Suppress console prompts
--help Print usage information.
--zookeeper <String: urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
root@e77c276dd7af:/#