Process a stream of data using Faust

5 minute read

Stream processing with Faust

In an earlier post I covered the fundamentals of stream processing applications, its time to apply that knowledge in Python. Here, we are going to learn how to use the Python stream processing library Faust to rapidly create powerful stream processing applications. Specifically, I will be focusing on Serialization and Deserialization of data to and from topics into and out of our faust-stream-processing-app.

Structure of a Faust App:

  • Every Faust application has an App which instantiates the top-level Faust Application.
  • The application must be assigned a topic to subscribe to.
  • An output Table or stream receives the output of the processing.
  • An asynchronous function is decorated with an agent which registers the function as a callback for the application when data is received.

Serialization and Deserialization

Faust provides model types that can be plugged into standard python objects to make serialization and deserialization of topic data a simple exercise. Managing data schemas is an important part of the process of working with streaming data. Thankfully, faust makes it easy. Here we will learn how to use Faust to pack and unpack data with very few lines of code.

Deserialization: Topic deserialization in Faust is a simple and painless process due to the simple API that is provided through the topic creation process. You can see we simply provided a key type and a value type. The key type is primitive because it’s a string and the value type is complex because it is Purchase. These simple annotations help faust automatically unpack the data from our Kafka streams into objects for our processor functions. For complex models, we do need to inherit from the faust.Record. This ensures that faust can correctly handle the data. You can see that our Purchase type now inherits from faust.Record, this means that we can use purchase as the key or the value type for a faust topic as shown below.

In addition, we have specified that validation=True. This means that faust will attempt to help verify that we received the correct type for the fields we described. So, in the below example, it would verify that the username and currency are str and that amount is int. If somebody were to set the username for example to an int, that record will fail validation. The serializer='json' field tells Faust, what datatype to expect by default for serialization coming from the Kafka topic. In this case, we specified that we expect to see JSON. Unfortunately, faust does not support Avro.

Serialization: One of the cool features of Faust is that the simple definition of a faust.Record helps us build deserialized and serialized data. What does this mean? Earlier, when we defined a faust record for purchases, we were using it to deserialize data. We could then take the same exact Purchase model and try to write data out to an output stream. In other words, if we were to serialize it, we wouldn’t need to make any changes. The fault library code manages the mappings of the fields we defined to the serialzed representation and vice versa.

Notice how we added this binary|json. This means that when we want to use 2 serializers here when we are sending the data to and from Kafka. This would encode our model as JSON, as if we sent it, and then base64 encode it. Likewise, if we were deserializing data, the Faust model will automatically run this process in the reverse. That is, it would base64 decode the data, and then unpack the JSON into our model. As you can see, the Faust Record API is quite simple and powerful when it comes to serialization/deserialization.

Key Points:

  • Serialization in Faust leverages the same faust.Record that we saw in the deserialization section. Faust runs the serializer in reverse to serialize the data for the output stream.
  • Multiple serialization codecs may be specified for a given model
    • e.g., serialization=”binary|json”. This means, when serializing, encode to json, then base64 encode the data.

Example of Stream Processing

As we have seen, Faust leverages the Record type to do both serialization and deserialization. We will now see an example of how to read data (deserialize) from a topic, process it using Faust, and store it (serialize) into another topic.

We start by ingesting Purchase data into a topic com.kafka.producers.purchases using this script: synch_producer.py. The producer script will insert 100 Purchase messages into the topic. Shown below is the kafka-console-consumer output showing that we are able to see these 100 messages in the topic.

root@19103899f8ed:/# kafka-console-consumer --bootstrap-server kafka0:9092 --topic com.kafka.producers.purchases --from-beginning
{"username": "michaelrice", "currency": "HTG", "amount": 187797}
{"username": "shelby06", "currency": "TRY", "amount": 131264}
{"username": "laurasmith", "currency": "SBD", "amount": 134460}
...
{"username": "christian49", "currency": "DKK", "amount": 17773}
{"username": "megantownsend", "currency": "PGK", "amount": 175666}
{"username": "stewartthomas", "currency": "LKR", "amount": 182153}

^CProcessed a total of 100 messages
root@19103899f8ed:/#

Deserialize topic data into our model

Example showing how to deserialize: Let us first see how to read data from the topic and store it in our model object and print the contents of the model object back to the console. In other words, we are deserializing the topic data. In this case, we are just writing the output to the console, but we could have done some processing to it if we wish to do so. Here’s the faust-stream-example.py

(venv) (etl) shravan-producers$ python faust-stream-example.py worker
┌ƒaµS† v1.9.0─┬────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ id          │ faust_stream_processor                                                                                 │
│ transport   │ [URL('kafka://localhost:9092')]                                                                        │
│ store       │ memory:                                                                                                │
│ web         │ http://localhost:6066/                                                                                 │
│ log         │ -stderr- (warn)                                                                                        │
│ pid         │ 86208                                                                                                  │
│ hostname    │ shravan-mbp.local                                                                                      │
│ platform    │ CPython 3.7.3 (Darwin x86_64)                                                                          │
│        +    │ Cython (Clang 4.0.1 (tags/RELEASE_401/final))                                                          │
│ drivers     │                                                                                                        │
│   transport │ aiokafka=1.1.3                                                                                         │
│   web       │ aiohttp=3.6.2                                                                                          │
│ datadir     │ /Users/shravan/projects/kafka-project/producers/faust_stream_processor-data    │
│ appdir      │ /Users/shravan/projects/kafka-project/producers/faust_stream_processor-data/v1 │
└─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────┘
starting➢ 😊
[2020-01-18 10:10:13,107] [86208] [WARNING] {
  "username": "hubervickie",
  "currency": "CUP",
  "amount": 31679
}
[2020-01-18 10:10:13,108] [86208] [WARNING] {
  "username": "stephanie06",
  "currency": "BBD",
  "amount": 140318
}
[2020-01-18 10:10:13,108] [86208] [WARNING] {
  "username": "michaelrice",
  "currency": "HTG",
  "amount": 187797
}
[2020-01-18 10:10:13,109] [86208] [WARNING] {
  "username": "sarah77",
  "currency": "SZL",
  "amount": 87159
}

Deserialize topic data into model -> process model data -> serialize data to a new topic

Example showing how to deserialize -> process -> serialize a stream of data: Now that we have some data in our topic: com.kafka.producers.purchases, it is time to read that in to our Purchase model object, then convert the amount to float and store it into a destination topic: com.kafka.producers.purchases.enhanced. Shown below is the output of the contents of source and destination topics as they flow through our stream processing app: faust-stream-processor-example.py. Note that the id faust_stream_processor2 is the application ID that we assign when defining our Faust application.

Input topic: com.kafka.producers.purchases

faust-stream-input-topic

stream processing app: faust-stream-processor-example.py

faust-stream-processing-app

Output topic: com.kafka.producers.purchases.enhanced

faust-stream-processor

Conclusion

Faust handling of records is one of the most fun parts of using this framework. As you have seen, you can define a model once and use it for both serialization and deserialization, and type validation. Remember that the Faust faust.Record type provides a powerful abstraction for both serialization and deserialization.

Useful Links: