Filter and enrich a stream of data using Faust

7 minute read

Creating Streams with Faust

The goal of this post is to show how we can create a Stream Processing Application to filter messages and enrich messages. In essence apply simple transformations to incoming messages and store the results in a new topic or to an intermediate stream.

Faust to create streams: Faust can be used to create an infinite series of ordered immutable events, otherwise known as a stream. Creating streams with Faust, is as simple as :

  1. defining a Faust topic
  2. and decorating @app.agent on a processing function.

In the below example, we can see that we first created a subscription to a topic: com.kafka.producers.purchases, then we defined an async function called purchase_event which is decorated with an @app.agent(purchase_events) that is subscribed to our source topic purchase_topic. Within this function, we have an infinite async iterable: purchase_events - which receives data from the stream as it is produced. The stream is the argument provided to our agent processing function: purchase_event(). Once we have the message, we can transform it or print it - like we have done in this case, or do any type of processing we are interested in.

Anatomy of a stream processing faust app:

from dataclasses import asdict, dataclass
import json
import faust

# Source Topic data
# Our model class representing data in the topic
@dataclass
class Purchase(faust.Record):
    username: str
    currency: str
    amount: int

# Every Faust application has an App which instantiates
# the top-level Faust Application
app = faust.App("faust_stream_processor", broker="kafka://localhost:9092")

# The application must be assigned a topic to subscribe to.
purchase_topic = app.topic("com.kafka.producers.purchases", value_type=Purchase)

# An asynchronous function is decorated with an agent which registers
# the function as a callback for the application when data is received.
@app.agent(purchase_topic)
async def purchase_event(purchase_events):
    async for purchase in purchase_events:
        print(json.dumps(asdict(purchase), indent=2))

if __name__ == "__main__":
    app.main()

Producing data to an output data stream is as simple as declaring another topic. For example, as shown in an earlier post, we can transform the amount field to a float and send that data to a destination topic: com.kafka.producers.purchases.enhanced.

Filter a stream of data

Example of a stream processing application that filters data:

In this example, we are going to filter (keep) any purchases with an amount greater than 100,000 dollars and anything that has currency which is not USD. So, in other words, we are only going to look at international currencies with large value purchases. To setup this example, first I produced 100 messages to com.kafka.producers.purchases using this synch_procuder.py . Shown below is the output of the first few messages from this topic.

root@19103899f8ed:~# kafka-console-consumer --bootstrap-server kafka0:9092 --topic com.kafka.producers.purchases --from-beginning
{"username": "michaelrice", "currency": "HTG", "amount": 187797}
{"username": "shelby06", "currency": "TRY", "amount": 131264}
{"username": "laurasmith", "currency": "SBD", "amount": 134460}
{"username": "angela32", "currency": "LKR", "amount": 197632}
{"username": "richard73", "currency": "KZT", "amount": 10344}
{"username": "lauren80", "currency": "ARS", "amount": 163854}

We want to filter out (remove) records like this: {"username": "ijones", "currency": "USD", "amount": 191066} where the currency is USD and amount is greater than 100000, then store the filtered stream of data into another topic: com.kafka.producers.purchases.filtered. Note: both conditions have to be met for the message to pass through.

We start our stream processing application faust-stream-filtering-example.py which contains the stream application with ID faust_stream_filtering and an infinite async iterable purchase_events. Notice, how we are filtering this iterable using async for purchase in purchase_events.filter(lambda x: x.currency != 'USD' and x.amount > 100000): and then sending the filtered data to the destination topic: com.kafka.producers.purchases.filtered.

faust-stream-flltering-2

Shown below are the messages inside the destination topic, which now contain 54 messages that have met both the criteria: currency != 'USD' and amount > 100000

faust-stream-filtering-1

Faust Stream Processors and Operations

Once you start building out stream applications, it is typical that you will find yourself performing similar operations over and over. Faust provides a mechanism by which you can define stream processing functions and then chain them together automatically as data arrives in your application.

Once, you have defined one of these functions once, you can reuse it on any other new or existing streams you may already have. The one caveat is that the other streams must use the same model and expect the same output as the function you defined. Defining processor functions is simple, as you will see, you can simply define a function that accepts the input stream model and outputs the output stream model. Finally, these functions can be synchronous or asynchronous. Whether the processors are synch or asynch, they will all complete before the output value is generated.

Faust defines a number of general purpose operations for streams:

  • group_by: Faust group_by operation takes an input stream and redirects it to another intermediate stream with a group_by key as the new key on the intermediate stream. For example, our purchase stream has values but no key in Kafka. Now, if we were to group_by currency, then that would now be the key in Kafka. This is also known as re-partitioning or co-partitioning if we are going to do a join. So in essence, the group_by operation ingests every incoming event from a source topic, and emits it to an intermediate topic with the newly specified key.
  • filter: The filter operation uses a boolean function to determine whether or not a particular record should be kept or discarded. Any records that are kept are written to a new intermediate stream.
    • take: The take operation bundles groups of events before invoking another iteration of the stream. Be careful to specify the within datetime.timedelta argument to this function, otherwise your program may hang.
  • Faust provides a number of other operations that you may use when working with your streams.

Enrich a stream by adding a new field to each message

Example of using general purpose operators on streams:

Now, let’s have a look at how we can apply processors and operations to our streams. In this example, we are going to enrich our incoming stream and add a fraud score to all incoming purchases. To do this, we define a function add_fraud_score() which for the sake of demonstration, will generate a random number between 0 and 1 using random.random() method, that we will use to update the fraud_score on these Purchase Models. Now, we are going to send the add_fraud_score() function as a processor on our stream of incoming data.

So we have out input stream: purchase_events stream and we are going to iterate over our purchase_events stream. Now to apply the processor function, we need to actually tell the purchase_events stream that we wanted to use this add_fraud_score() processor before we start to iterate on it. Doing so will add the fraud score to each incoming message. Finally, we write that out to a new topic and use kafka-console-consumer to view data from the new topic.

Source topic:

root@19103899f8ed:~# kafka-console-consumer --bootstrap-server kafka0:9092 --topic com.kafka.producers.purchases --from-beginning
{"username": "michaelrice", "currency": "HTG", "amount": 187797}
{"username": "shelby06", "currency": "TRY", "amount": 131264}
{"username": "laurasmith", "currency": "SBD", "amount": 134460}
{"username": "angela32", "currency": "LKR", "amount": 197632}
{"username": "richard73", "currency": "KZT", "amount": 10344}
{"username": "lauren80", "currency": "ARS", "amount": 163854}

Start the streaming application: faust-stream-enrichment-example.py

faust-stream-enrichment-2

Output shows that the stream has new field fraud_score for each message:

faust-stream-enrichment-1

Message Life Cycle and Management

When we reviewed Kafka consumers, we learned about consumers and consumer groups. We saw that the offset of a consumer is how Kafka keeps track, of what that consumer has already seen. How does faust manage that state? Thankfully, faust automatically handles this for you using aioKafka. Here’s how it works, Faust creates a Kafka consumer within the context of the underlying library, which is responsible for subscribing and sending events from any requests and topics. The aioKafka consumer is responsible for managing the offsets, and will periodically commit its own offsets back to Kafka. The consumer can then forward the message on to the subscribed agent for the topic, where it is processed. But the processing of an event completes, in other words, when that iteration of the Async iterable ends, the message is automatically acknowledged. What is neat about how faust handles data, is that it only subscribes to topics once. So, if you have multiple agents subscribed to the same topic, Faust consumers will only fetch it once. Faust applications may choose to forward processed messages on to another stream by using the topic.send(<data>) function at the end of the processing loop.

Useful links:

Conclusion

We have seen two common use-cases of filtering and enriching incoming stream of data and store into a new topic. Although the examples in this post are quite trivial, it paves the way for applying a series of complex transformations which are programatically similar to the examples provided here.