In typical data warehousing systems, data is first accumulated and then processed. But with the advent of new technologies, it is now possible to process data as and when it arrives. We call this real-time data processing. In real-time processing, data streams through pipelines; i.e., moving from one system to another. Data gets generated from static sources (like databases) or real-time systems (like transactional applications), and then gets filtered, transformed, and finally stored in a database or pushed to several other systems for further processing. The other systems can then follow the same cycle—i.e., filter, transform, store, or push to other systems.
In this article, we will build a Quarkus application that streams and processes data in real-time using Kafka Streams. As we go through the example, you will learn how to apply Kafka concepts such as joins, windows, processors, state stores, punctuators, and interactive queries. By the end of the article, you will have the architecture for a realistic data streaming pipeline in Quarkus.
The traditional messaging system
As developers, we are tasked with updating a message-processing system that was originally built using a relational database and a traditional message broker. Here's the data flow for the messaging system:
- Data from two different systems arrives in two different messaging queues. Each record in one queue has a corresponding record in the other queue. Each record has a unique key.
- When a data record arrives in one of the message queues, the system uses the record's unique key to determine whether the database already has an entry for that record. If it does not find a record with that unique key, the system inserts the record into the database for processing.
- If the same data record arrives in the second queue within a few seconds, the application triggers the same logic. It checks whether a record with the same key is present in the database. If the record is present, the application retrieves the data and processes the two data objects.
- If the data record doesn't arrive in the second queue within 50 seconds after arriving in the first queue, then another application processes the record in the database.
As you might imagine, this scenario worked well before the advent of data streaming, but it does not work so well today.
The data streaming pipeline
Our task is to build a new message system that executes data streaming operations with Kafka. This type of application is capable of processing data in real-time, and it eliminates the need to maintain a database for unprocessed records. Figure 1 illustrates the data flow for the new application:
In the next sections, we'll go through the process of building a data streaming pipeline with Kafka Streams in Quarkus. You can get the complete source code from the article's GitHub repository. Before we start coding the architecture, let's discuss joins and windows in Kafka Streams.
Joins and windows in Kafka Streams
Kafka allows you to join records that arrive on two different topics. You are probably familiar with the concept of joins in a relational database, where the data is static and available in two tables. In Kafka, joins work differently because the data is always streaming.
We'll look at the types of joins in a moment, but the first thing to note is that joins happen for data collected over a duration of time. Kafka calls this type of collection windowing. Various types of windows are available in Kafka. For our example, we will use a tumbling window.
Inner joins
Now, let's consider how an inner join works. Assume that two separate data streams arrive in two different Kafka topics, which we will call the left and right topics. A record arriving in one topic has another relevant record (with the same key but a different value) that is also arriving in the other topic. The second record arrives after a brief time delay. As shown in Figure 2, we create a Kafka stream for each of the topics.
The inner join on the left and right streams creates a new data stream. When it finds a matching record (with the same key) on both the left and right streams, Kafka emits a new record at time t2 in the new stream. Because the B record did not arrive on the right stream within the specified time window, Kafka Streams won't emit a new record for B.
Outer joins
Next, let's look at how an outer join works. Figure 3 shows the data flow for the outer join in our example:
If we don't use the "group by" clause when we join two streams in Kafka Streams, then the join operation will emit three records. Streams in Kafka do not wait for the entire window; instead, they start emitting records whenever the condition for an outer join is true. So, when Record A on the left stream arrives at time t1, the join operation immediately emits a new record. At time t2, the outerjoin
Kafka stream receives data from the right stream. The join operation immediately emits another record with the values from both the left and right records.
You would see different outputs if you used the groupBy
and reduce
functions on these Kafka streams. In that case, the streams would wait for the window to complete the duration, perform the join, and then emit the data, as previously shown in Figure 3.
Understanding how inner and outer joins work in Kafka Streams helps us find the best way to implement the data flow that we want. In this case, it is clear that we need to perform an outer join. This type of join allows us to retrieve records that appear in both the left and right topics, as well as records that appear in only one of them.
With that background out of the way, let's begin building our Kafka-based data streaming pipeline.
Note: We can use Quarkus extensions for Spring Web and Spring DI (dependency injection) to code in the Spring Boot style using Spring-based annotations.
Step 1: Perform the outer join
To perform the outer join, we first create a class called KafkaStreaming
, then add the function startStreamStreamOuterJoin()
:
@RestController public class KafkaStreaming { private KafkaStreams streamsOuterJoin; private final String LEFT_STREAM_TOPIC = "left-stream-topic"; private final String RIGHT_STREAM_TOPIC = "right-stream-topic"; private final String OUTER_JOIN_STREAM_OUT_TOPIC = "stream-stream-outerjoin"; private final String PROCESSED_STREAM_OUT_TOPIC = "processed-topic"; private final String KAFKA_APP_ID = "outerjoin"; private final String KAFKA_SERVER_NAME = "localhost:9092"; @RequestMapping("/startstream/") public void startStreamStreamOuterJoin() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, KAFKA_APP_ID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_NAME); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> leftSource = builder.stream(LEFT_STREAM_TOPIC); KStream<String, String> rightSource = builder.stream(RIGHT_STREAM_TOPIC); // TODO 1 - Add state store // do the outer join // change the value to be a mix of both streams value // have a moving window of 5 seconds // output the last value received for a specific key during the window // push the data to OUTER_JOIN_STREAM_OUT_TOPIC topic leftSource.outerJoin(rightSource, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofSeconds(5))) .groupByKey() .reduce(((key, lastValue) -> lastValue)) .toStream() .to(OUTER_JOIN_STREAM_OUT_TOPIC); // build the streams topology final Topology topology = builder.build(); // TODO - 2: Add processor code later streamsOuterJoin = new KafkaStreams(topology, props); streamsOuterJoin.start(); } }
When we do a join, we create a new value that combines the data in the left and right topics. If any record with a key is missing in the left or right topic, then the new value will have the string null
as the value for the missing record. Also, the Kafka Stream reduce
function returns the last-aggregated value for all of the keys.
Note: The TODO 1 - Add state store
and TODO - 2: Add processor code later
comments are placeholders for code that we will add in the upcoming sections.
The data flow so far
Figure 4 illustrates the following data flow:
- When a record with key A and value V1 comes into the left stream at time t1, Kafka Streams applies an outer join operation. At this point, the application creates a new record with key A and the value left=V1, right=null.
- When a record with key A and value V2 arrives in the right topic, Kafka Streams again applies an outer join operation. This creates a new record with key A and the value left=V1, right=V2.
- When the
reduce
function is evaluated at the end of the duration window, the Kafka Streams API emits the last value that was computed, per the unique record key. In this case, it emits a record with key A and a value of left=V1, right=V2 into the new stream. - The new stream pushes the record to the
outerjoin
topic.
Figure 4: The data streaming pipeline so far.">
Next, we will add the state store and processor code.
Step 2: Add the Kafka Streams processor
We need to process the records that are being pushed to the outerjoin
topic by the outer join operation. Kafka Streams provides a Processor API that we can use to write custom logic for record processing. To start, we define a custom processor, DataProcessor
, and add it to the streams topology in the KafkaStreaming
class:
public class DataProcessor implements Processor<String, String>{ private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(String key, String value) { if(value.contains("null")) { // TODO 3: - let's process later } else { processRecord(key, value); //forward the processed data to processed-topic topic context.forward(key, value); } context.commit(); } @Override public void close() { } private void processRecord (String key, String value) { // your own custom logic. I just print System.out.println("==== Record Processed ==== key: "+key+" and value: "+value); } }
The record is processed, and if the value does not contain a null
string, it is forwarded to the sink topic (that is, the processed-topic
topic). In the bolded parts of the KafkaStreaming
class below, we wire the topology to define the source topic (i.e., the outerjoin
topic), add the processor, and finally add a sink (i.e., the processed-topic
topic). Once it's done, we can add this piece of code to the TODO - 2: Add processor code later
section of the KafkaStreaming
class:
// add another stream that reads data from OUTER_JOIN_STREAM_OUT_TOPIC topic topology.addSource("Source", OUTER_JOIN_STREAM_OUT_TOPIC); // add a processor to the stream so that each record is processed topology.addProcessor("StateProcessor", new ProcessorSupplier<String, String>() { public Processor<String, String> get() { return new DataProcessor(); }}, "Source"); topology.addSink("Sink", PROCESSED_STREAM_OUT_TOPIC, "StateProcessor");
Note that all we do is to define the source topic (the outerjoin
topic), add an instance of our custom processor class, and then add the sink topic (the processed-topic
topic). The context.forward()
method in the custom processor sends the record to the sink topic.
Figure 5 shows the architecture that we have built so far.
Step 3: Add the punctuator and StateStore
If you looked closely at the DataProcessor
class, you probably noticed that we are only processing records that have both of the required (left-stream and right-stream) key values. We also need to process records that have just one of the values, but we want to introduce a delay before processing these records. In some cases, the other value will arrive in a later time window, and we don't want to process the records prematurely.
State store
In order to delay processing, we need to hold incoming records in a store of some kind, rather than an external database. Kafka Streams lets us store data in a state store. We can use this type of store to hold recently received input records, track rolling aggregates, de-duplicate input records, and more.
Punctuators
Once we start holding records that have a missing value from either topic in a state store, we can use punctuators to process them. As an example, we could add a punctuator
function to a processorcontext.schedule()
method. We can set the schedule to call the punctuate()
method.
Add the state store
Adding the following code to the KafkaStreaming
class adds a state store. Place this code where you see the TODO 1 - Add state store
comment in the KafkaStreaming
class:
// build the state store that will eventually store all unprocessed items Map<String, String> changelogConfig = newHashMap<>(); StoreBuilder<KeyValueStore<String, String>> stateStore = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(STORE_NAME), Serdes.String(), Serdes.String()) .withLoggingEnabled(changelogConfig); ..... ..... ..... ..... // add the state store in the topology builder topology.addStateStore(stateStore, "StateProcessor");
We have defined a state store that stores the key and value as a string. We've also enabled logging, which is useful if the application dies and restarts. In that case, the state store won't lose data.
We'll modify the processor's process()
to put records with a missing value from either topic in the state store for later processing. Place the following code where you see the comment TODO 3 - let's process later
in the KafkaStreaming
class:
if(value.contains("null")) { if (kvStore.get(key) != null) { // this means that the other value arrived first // you have both the values now and can process the record String newvalue = value.concat(" ").concat(kvStore.get(key)); process(key, newvalue); // remove the entry from the statestore (if any left or right record came first as an event) kvStore.delete(key); context.forward(key, newvalue); } else { // add to state store as either left or right data is missing System.out.println("Incomplete value: "+value+" detected. Putting into statestore for later processing"); kvStore.put(key, value); } }
Add the punctuator
Next, we add the punctuator to the custom processor we've just created. For this, we update the DataProcessor
's init()
method to the following:
private KeyValueStore<String, String> kvStore; @Override public void init(ProcessorContext context) { this.context = context; kvStore = (KeyValueStore) context.getStateStore(STORE_NAME); // schedule a punctuate() method every 50 seconds based on stream-time this.context.schedule(Duration.ofSeconds(50), PunctuationType.WALL_CLOCK_TIME, new Punctuator(){ @Override public void punctuate(long timestamp) { System.out.println("Scheduled punctuator called at "+timestamp); KeyValueIterator<String, String> iter = kvStore.all(); while (iter.hasNext()) { KeyValue<String, String> entry = iter.next(); System.out.println(" Processed key: "+entry.key+" and value: "+entry.value+" and sending to processed-topic topic"); context.forward(entry.key, entry.value.toString()); kvStore.put(entry.key, null); } iter.close(); // commit the current processing progress context.commit(); } } ); }
We've set the punctuate logic to be invoked every 50 seconds. The code retrieves entries in the state store and processes them. The forward()
function then sends the processed record to the processed-topic
topic. Lastly, we delete the record from the state store.
Figure 6 shows the complete data streaming architecture:
Interactive queries
We are finished with the basic data streaming pipeline, but what if we wanted to be able to query the state store? In this case, we could use interactive queries in the Kafka Streams API to make the application queryable. See the article's GitHub repository for more about interactive queries in Kafka Streams.
Summary
You can use the streaming pipeline that we developed in this article to do any of the following:
- Process records in real-time.
- Store data without depending on a database or cache.
- Build a modern, event-driven architecture.
I hope the example application and instructions will help you with building and processing data streaming pipelines. You can get the source code for the example application from this article's GitHub repository.
Last updated: January 12, 2024