Kafka: Data Schemas, Apache Avro and Schema Registry

17 minute read

Role of data schemas, Apache Avro and Schema Registry in Kafka

In this post we will learn how data schemas help make consumers and producers more resilient to change. We’ll focus on Apache Avro and see how it fits into the Kafka ecosystem through tools like Schema Registry.

Understand why data schemas are a critical part of a real world stream processing application?

Commonly people send payloads to a streaming data store Kafka, using either string or json formats. As data flows through our stream processing applications, it’s critical for both consumers and producers to understand what data is expected.

What are Data Schemas? :

  • Data schemas help us define:
    • The shape of the data
    • The names of fields
    • The expected types of values
    • Whether certain data fields are optional or required.
  • Data schemas provide expectations for applications so that they can properly ingest or produce data that match that specification
  • Data schemas are used for communication between software
  • Data schemas can help us create more efficient representations with compression
  • Data schemas help systems develop independently of each other
  • Data schemas are critical in data systems and applications today
    • gRPC in Kubernetes
    • Apache Avro in the Hadoop Ecosystem

Where are Data Schemas used? Look at the following CREATE TABLE statement in Postgres. We are essentially telling the database what data to expect, what shape it will take, and what the possible types of the accepted values are for each column. This is a very common scenario of schema usage.

CREATE TABLE store_location(
    id INTEGER,
    name VARCHAR(80),
    city VARCHAR(40),
    latitude NUMERIC(10),
    longitude NUMERIC(10)
);

Outside of SQL databases, if you’ve ever worked with tools such as Hadoop, Hive or Presto, you’ve likely seen Apache Avro used to describe data. For example, Presto can talk to many other data sources, some are traditional databases like Postgres, and others are simple cloud-based key-value stores like Amazon S3. For presto to allow users to run queries across these very different data sources, it has to have some idea of what the data looks like. Data Engineers can write Avro schemas to tell Presto what to expect when it fetches data from data sources like Amazon S3. If the data is stored in Avro format, presto will know how to load the data and what data types to expect.

As you can see, Hive is talking to this data store which has not just only data but also an Avro schema to go along with it and then when data is returned to Hive, it not only gets the data, but it receives the schema, and it’s then able to take that data and transform it into a usable payload.

Example scenario to understand the need for data schemas

Pretend that we are working together on a data engineering project. On this project, we are consuming data from a production web service through Kafka. This web service that we’re consuming data from has no formally defined schema. Shown below is a system that is released with no schema. All goes well at first, the producer is creating data, sending it through Kafka, and the consumer is consuming the data. But suddenly, we start seeing errors in our consumer application logs. These errors show that our Kafka consumer is failing to deserialize the received data into our application. What happened?

data-schemas

We message our partners on the Web Server team and see if anything has changed. After looking into their code, they say that, yeah we restructured our data model on our end because of a requirements change. Now there’s nothing for us to do but drop everything and fix our code to handle this change. This is a costly endeavor. So what we could have done differently?

We could have worked with our collegues in the Web Server team and asked them to define a schema, then we would have had a more formalized agreement as to what the data should have looked like. This schema, would have updated when the data changed, helping our application stay resilient to the update.

Some schema definition languages can even indicate backwards, forwards, and fully compatible changes. In the above figure, we can see that the update occurred at message 3002, that broke our consumer.

Streaming applications are highly dependent on data schemas:

Schemas are widely used in data streaming applications, to codify the data being produced and received. Because of the real-time nature of the streaming applications, and the fact that these systems are consuming data from other applications, that may not be even aware of the streaming application exists, schemas are absolutely critical for maintaining a functioning streaming application.

Schemas allow for data producers to evolve largely independently of the consumers - namely streaming applications. In many organizations, the data engineers building a streaming application may not even interact with the developers building the producer applications at all. Application developers may not be aware or may not be interested in the needs of the data engineering streaming applications. Given this dynamic, both data producers and the consuming streaming applications, must be able to evolve independently without failure.

Schemas provide the tool that stream processing applications need to evolve independently of their upstream applications. Schemas can help shield data consumers from unexpected changes in the system. Some schema systems even provide detailed information about data compatibility to help applications understand what they can or cannot do with a given piece of data as it arrives.

Introduction to Avro

Apache Avro is a widely used data schema system in the data engineering space, and especially in the Apache Kafka ecosystem. In this section, we’ll review key concepts as they relate to Avro and Stream Processing.

Up until now, we have made use of JSON as our schema format. While JSON helps us get part of the way there in terms of defining a schema, it has significant drawbacks.

  • First, JSON doesn’t help us determine the type of the field.
  • Secondly, whether or not we should always expect a key to be present.

This is where Apache Avro comes in.

avro-binary

Apache Avro is a data serialization system that uses a binary data format. Meaning, when data in your application is shared in the Avro format, it is compressed into a binary format over the network. This binary format improves speed over the network and can help reduce storage overhead. Binary format of Avro includes not just your application data in the schema format you define, but it also includes the schema definition itself.

When clients receive data from your application in Avro format, they not only receive the actual data, but they also get the Avro instructions or the schema for how to deserialize the data from binary into their own application model representation.

How Avro Schemas are defined?

Every Avro schema defintion has a handful of required fields (don’t confuse this with fields field). All Avro records have fields that define expected data keys and types. The first field that is required is the name field - this identifies the name of the schema. You can also optionally specify a namespace for your record, this isn’t required, but it would be a good thing to do if you can. Next, all schemas must have a type field, for example, shown below is an example of a record. The top-most entry in our Json AVRO is always the type record. A record is a collection of fields, that may or may not have any values.

avro-record

Key points:

  • Apache Avro records are defined in JSON.
  • Avro records include a required name, such as “user”
  • Avro records must include a type defined as record
  • Avro records may optionally include a namespace, such as “com.udacity”
  • Avro records are required to include an array of fields that define the names of the expected fields and their associated type. Such as "fields": [{"name": "age", "type": "int"}]
  • Avro can support optional fields by specifying the field type as either null or some other type. Such as "fields": [{"name": "age", "type": [“null”, "int"]}]
  • Avro records are made up of complex and primitive types
    • Complex types are other records, arrays, maps, and others

Useful link:

Defining Avro schemas using fastavro

Here we will take a look at how to define an avro schema, and then send that data to Kafka in that format. The schema is defined part of the ClickEvent class which is of the form:

# Define an Avro Schema for the ClickEvent object
schema = parse_schema(
        {
            "type": "record",
            "name": "click_event",
            "namespace": "com.udacity.lesson3.exercise2",
            "fields": [
                {"name": "email", "type": "string"},
                {"name": "timestamp", "type": "string"},
                {"name": "uri", "type": "string"},
                {"name": "number", "type": "int"},
            ],
        }
    )

The function parse_schema is from the module fastavro. fastavro. Essentially, the parse_schema function returns a parsed avro schema. It is not necessary to call parse_schema but doing so and saving the parsed schema for use later will make future operations faster as the schema will not need to be reparsed.

Simple example:

from fastavro import parse_schema
from fastavro import writer

parsed_schema = parse_schema(original_schema)
with open('weather.avro', 'wb') as out:
    writer(out, parsed_schema, records)

Apache Avro - Summary: Here we learned how to use Apache Avro as a Data Schema:

  • Avro has primitive types, such as int, string, and float
  • Avro has complex types, such as record, map, and array
  • Avro data is sent alongside the schema definition, so that downstream consumers can make use of it
  • Avro is used widely in data engineering and the Kafka ecosystem

Useful link:

Apache Avro and Kafka

As the Kafka development team began to tackle the problem of schema evolution between producers and consumers in the ecosystem, they knew they needed to identify a schema technology to work with. Apache Avro was identified early in the development of Kafka, and its prevelance and the tooling has grown ever since. Although Avro is not required to use Kafka, and you can infact use any other schema format that you like, Avro is used extensively in the Kafka ecosystem, and using it will drastically improve your experience.

When using Avro with Apache Kafka, the producer must define an Avro schema for messages they would like to produce to Kafka. Many client libraries have built-in support for Avro. That means that all you have to do is define your schema, and pass it along to special Avro produce functions. In this case, the library will simply handle encoding the data for you.

avro-schemas

When you use the confluent Kafka python libraries, special Avro consumer, it will automatically unpack the Avro data it receives from Kafka, using the Avro schema that was packaged alongside it. This functionality is extremely convinient, and a perfect example of why it is beneficial to use Avro when you work with Kafka. The ecosystem and its libraries supply you with significant benefits with very little effort. While it does take a little more effort to define your schema in Avro versus JSON or plaintext, you will be grateful to have spent that effort when your application tolerates downstream data changes more gracefully.

Useful links:

Schema Registry

Up until now, we have manually sent schemas alongside every message we sent through Kafka. Here, we will learn how we can use a tool built at confluent called Schema Registry to help streamline this process and help us better manage your schemas.

Confluent Schema Registry is an open-source tool that provides centralized Avro Schema storage.

Understand the need for a Schema Registry

Sending our Avro schema definition alongside every message introduces some additional network and storage overhead in our producer and consumer applications. Not only that, but it introduces additional work on the consumer and producer to correctly serialize and deserialize from Avro. Schema Registry is a tool built by confluent and deployed alongside Apache Kafka that can help reduce some of the overhead involved with using Avro. If Schema Registry is in use in your cluster, you no longer need to send schemas alongside your payloads to Kafka. Instead, your Kafka client can be configured to send the schema to Schema Registry over HTTP instead.

Schema Registry assigns the name schema a version number and then stores it in a private topic. Until the schema definition is updated or changes again, the producer never needs to send a schema to either Schema Registry or the Kafka brokers ever again. When consumers then consume data from the Kafka topic, the Kafka client library will automatically retrieve the avro schema for the message from Schema Registry.

schema-registry

Schema Registry can pull historical schemas as well. So all data stored in the Kafka topic can be deserialized by clients. As soon as the client has retrieved a specific version of a schema, it does not need to retrieve it from Schema Registry ever again. Just to emphasize the point when using Schema Registry, consumers and producers only fetch or produce a schema when they don’t already have it. Once they fetch the schema version, it’s never fetched again. This can dramatically decrease networking overhead for high throughput applications. It is worth mentioning that Schema Registry does not support deletes by default. You can enable this behavior but most Schema Registry brokers don’t have this enabled.

Similar to Avro, the confluent kafka python client library has native support for Schema Registry. If there is no native support, then you can still use its HTTP REST API. And lastly, Schema Registry can be used by any application that wants to efficiently store and retrieve data across multiple versions.

Architecture of Schema Registry: Firstly Schema Registry is another java process. It does not use a traditional database to store a state. Instead, it utilizes Kafka itself to store data in a Schema’s topic. Schema Registry simply exposes a HTTP web-server with a REST API for managing your schemas. Whenever you use a client library that integrates with schema registry, it is simply using the schema registry REST API under the hood. You can run more than one schema registry node at a time to form a Schema Registry cluster. Finally, similar to how Kafka itself uses Zookeeper, schema registry also uses Zookeeper to choose a single schema registry web-server leader in a given cluster of schema registry nodes.

Useful Links:

import asyncio
from dataclasses import asdict, dataclass, field
import json
import random

from confluent_kafka import avro, Consumer, Producer
from confluent_kafka.avro import AvroConsumer, AvroProducer, CachedSchemaRegistryClient
from faker import Faker


faker = Faker()

SCHEMA_REGISTRY_URL = "http://localhost:8081"
BROKER_URL = "PLAINTEXT://localhost:9092"


@dataclass
class ClickAttribute:
    element: str = field(default_factory=lambda: random.choice(["div", "a", "button"]))
    content: str = field(default_factory=faker.bs)

    @classmethod
    def attributes(self):
        return {faker.uri_page(): ClickAttribute() for _ in range(random.randint(1, 5))}


@dataclass
class ClickEvent:
    email: str = field(default_factory=faker.email)
    timestamp: str = field(default_factory=faker.iso8601)
    uri: str = field(default_factory=faker.uri)
    number: int = field(default_factory=lambda: random.randint(0, 999))
    attributes: dict = field(default_factory=ClickAttribute.attributes)

    #
    # Load the schema using the Confluent avro loader
    # See: https://github.com/confluentinc/confluent-kafka-python/blob/master/confluent_kafka/avro/load.py#L23
    #
    schema = avro.loads(
        """{
        "type": "record",
        "name": "click_event",
        "namespace": "com.udacity.lesson3.solution4",
        "fields": [
            {"name": "email", "type": "string"},
            {"name": "timestamp", "type": "string"},
            {"name": "uri", "type": "string"},
            {"name": "number", "type": "int"},
            {
                "name": "attributes",
                "type": {
                    "type": "map",
                    "values": {
                        "type": "record",
                        "name": "attribute",
                        "fields": [
                            {"name": "element", "type": "string"},
                            {"name": "content", "type": "string"}
                        ]
                    }
                }
            }
        ]
    }"""
    )


async def produce(topic_name):
    """Produces data into the Kafka Topic"""
    #
    #       Create a CachedSchemaRegistryClient. Use SCHEMA_REGISTRY_URL.
    # See: https://github.com/confluentinc/confluent-kafka-python/blob/master/confluent_kafka/avro/cached_schema_registry_client.py#L47
    #
    schema_registry = CachedSchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})

    #
    #       Replace with an AvroProducer.
    # See: https://docs.confluent.io/current/clients/confluent-kafka-python/index.html?highlight=loads#confluent_kafka.avro.AvroProducer
    #
    p = AvroProducer({"bootstrap.servers": BROKER_URL}, schema_registry=schema_registry)
    while True:
        #
        #       Replace with an AvroProducer produce. Make sure to specify the schema!
        #       Tip: Make sure to serialize the ClickEvent with `asdict(ClickEvent())`
        #  See: https://docs.confluent.io/current/clients/confluent-kafka-python/index.html?highlight=loads#confluent_kafka.avro.AvroProducer
        #
        p.produce(
            topic=topic_name, value=asdict(ClickEvent()), value_schema=ClickEvent.schema
        )
        await asyncio.sleep(1.0)


async def consume(topic_name):
    """Consumes data from the Kafka Topic"""
    #
    #     Create a CachedSchemaRegistryClient
    #
    schema_registry = CachedSchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})

    #
    #     Use the Avro Consumer
    #
    c = AvroConsumer(
        {"bootstrap.servers": BROKER_URL, "group.id": "0"},
        schema_registry=schema_registry,
    )
    c.subscribe([topic_name])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            try:
                print(message.value())
            except KeyError as e:
                print(f"Failed to unpack message {e}")
        await asyncio.sleep(1.0)


def main():
    """Checks for topic and creates the topic if it does not exist"""
    try:
        asyncio.run(produce_consume("com.udacity.lesson3.solution4.clicks"))
    except KeyboardInterrupt as e:
        print("shutting down")


async def produce_consume(topic_name):
    """Runs the Producer and Consumer tasks"""
    t1 = asyncio.create_task(produce(topic_name))
    t2 = asyncio.create_task(consume(topic_name))
    await t1
    await t2


if __name__ == "__main__":
    main()

Output:

root@4c851fe877c5:/home/workspace# python schema_registry_example.py
no message received by consumer
no message received by consumer
no message received by consumer
{'email': 'wagnercarol@johnson-singh.com', 'timestamp': '1993-09-08T01:04:51', 'uri': 'https://www.wright.com/', 'number': 209, 'attributes': {'terms': {'element': 'a', 'content': 'scale dot-com portals'}, 'register': {'element': 'div', 'content': 'embrace visionary networks'}, 'main': {'element': 'button', 'content': 'syndicate B2B users'}, 'faq': {'element': 'button', 'content': 'e-enable bleeding-edge deliverables'}, 'privacy': {'element': 'div', 'content': 're-contextualize integrated technologies'}}}
{'email': 'nmcgee@gmail.com', 'timestamp': '1999-09-09T11:37:11', 'uri': 'http://www.gomez.net/search/', 'number': 949, 'attributes': {'login': {'element': 'div', 'content': 'engage granular content'}, 'register': {'element': 'div', 'content': 'unleash dot-com mindshare'}, 'terms': {'element': 'button', 'content': 'transition front-end e-services'}, 'home': {'element': 'button', 'content': 'architect holistic bandwidth'}}}
{'email': 'gmcdonald@yahoo.com', 'timestamp': '2004-12-19T08:43:47', 'uri': 'http://www.kirby-lopez.com/categories/main/terms.html', 'number': 691, 'attributes': {'terms': {'element': 'div', 'content': 'engineer cutting-edge bandwidth'}, 'search': {'element': 'a', 'content': 'optimize seamless info-mediaries'}}}
{'email': 'cody55@cruz.com', 'timestamp': '1974-08-10T12:15:50', 'uri': 'http://www.fowler.com/blog/blog/tags/privacy.php', 'number': 97, 'attributes': {'search': {'element': 'a', 'content': 'evolve cutting-edge models'}, 'index': {'element': 'div', 'content': 'leverage web-enabled portals'}, 'register': {'element': 'button', 'content': 'streamline seamless synergies'}}}
{'email': 'edwardmoore@yahoo.com', 'timestamp': '2002-10-20T10:44:57', 'uri': 'http://glenn-smith.info/', 'number': 109, 'attributes': {'home': {'element': 'div', 'content': 'streamline bricks-and-clicks platforms'}, 'homepage': {'element': 'a', 'content': 'visualize frictionless solutions'}}}
^Cshutting down
root@4c851fe877c5:/home/workspace#

Schema Registry - Summary

  • Provides an HTTP REST API for managing Avro schemas
  • Many Kafka clients natively support Schema Registry interactions for you
  • Reduces network overhead, allowing producers and consumers to register schemas one time
  • Simplifies using Avro, reducing the barrier to entry for developers
  • Uses a Kafka topic to store state
  • Deployed as one or more web servers, with one leader
  • Uses ZooKeeper to manage elections

Schema Evolution and Compatibility

Schemas change over time with new requirements. Here, we will see how Avro and Schema Registry can aid in the process of Schema Evolution.

Data in software systems rarely remains static. As requirements change, so too will our schemas. This process of schema change is known as Schema Evolution. Understanding how schemas evolve and how it will impact our software is an important part of managing streaming data. Schema registry provides explicit compatibility types to help downstream consumers understand whether new data is compatible or not as the schema evolves.

Schema Evolution: is the process of changing the schema of a given dataset. Modifying, adding or removing a field are all forms of schema evolution. In practical terms, that means a Kafka producer has modified the shape of the data as well as a data schema that it intends to send. For example, we may be adding a field or removing a field, those would be types of schema evolution. Even making a field optional or changing its type or the meaning of an existing field is considered to be a form of schema evolution.

In practice evolving a schema simply means updating the Avro definition and resubmitting the schema to schema registry with some compatibility information.

When we say that we send an updated schema to schema registry, we also send along a compatibility setting. This compatibility setting indicates to schema registry what downstream consumers can expect when they start to consume new data. What’s great about this, is that as downstream consumers fetch data from Kafka, the client code will also fetch the updated schema from schema registry.

Glossary of key terms

  • Data Schema - Define the shape of a particular kind of data. Specifically, data schemas define the expected fields, their names, and value types for those fields. Data schemas may also indicate whether fields are required or optional.

  • Apache Avro - A data serialization framework which includes facilities for defining and communicating data schemas. Avro is widely used in the Kafka ecosystem and data engineering generally.

  • Record(Avro) - A single encoded record in the defined Avro format

  • Primitive Type (Avro) - In Avro, a primitive type is a type which requires no additional specification. They are: null, boolean, int, long, float, double, bytes, string.

  • Complex Type (Avro) - In Avro, a complex type models data structures which may involve nesting or other advanced functionality: records, enums, maps, arrays, unions, fixed.

  • Schema Evolution - The process of modifying an existing schema with new, deleted, or modified fields.

  • Schema Compatibility - Determines whether or not two given versions of a schema are usable by a given client.

  • Backward Compatibility - means that consumer code developed against the most recent version of an Avro Schema can use data using the prior version of a schema without modification.

  • Forward Compatibility - means that consumer code developed against the previous version of an Avro Schema can consume data using the newest version of a schema without modification.

  • Full Compatibility - means that consumers developed against the latest schema can consume data using the previous schema, and that consumers developed against the previous schema can consume data from the latest schema as well. In other words, full compatibility means that a schema change is both forward and backward compatible.

  • None Compatibility - disables compatibility checking by Schema Registry.