Skip to main content
Redhat Developers  Logo
  • Products

    Featured

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat OpenShift AI
      Red Hat OpenShift AI
    • Red Hat Enterprise Linux AI
      Linux icon inside of a brain
    • Image mode for Red Hat Enterprise Linux
      RHEL image mode
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • Red Hat Developer Hub
      Developer Hub
    • View All Red Hat Products
    • Linux

      • Red Hat Enterprise Linux
      • Image mode for Red Hat Enterprise Linux
      • Red Hat Universal Base Images (UBI)
    • Java runtimes & frameworks

      • JBoss Enterprise Application Platform
      • Red Hat build of OpenJDK
    • Kubernetes

      • Red Hat OpenShift
      • Microsoft Azure Red Hat OpenShift
      • Red Hat OpenShift Virtualization
      • Red Hat OpenShift Lightspeed
    • Integration & App Connectivity

      • Red Hat Build of Apache Camel
      • Red Hat Service Interconnect
      • Red Hat Connectivity Link
    • AI/ML

      • Red Hat OpenShift AI
      • Red Hat Enterprise Linux AI
    • Automation

      • Red Hat Ansible Automation Platform
      • Red Hat Ansible Lightspeed
    • Developer tools

      • Red Hat Trusted Software Supply Chain
      • Podman Desktop
      • Red Hat OpenShift Dev Spaces
    • Developer Sandbox

      Developer Sandbox
      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Secure Development & Architectures

      • Security
      • Secure coding
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
      • View All Technologies
    • Start exploring in the Developer Sandbox for free

      sandbox graphic
      Try Red Hat's products and technologies without setup or configuration.
    • Try at no cost
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • Java
      Java icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • API Catalog
    • Product Documentation
    • Legacy Documentation
    • Red Hat Learning

      Learning image
      Boost your technical skills to expert-level with the help of interactive lessons offered by various Red Hat Learning programs.
    • Explore Red Hat Learning
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

Build a data streaming pipeline using Kafka Streams and Quarkus

September 28, 2020
Kapil Shukla
Related topics:
Event-DrivenJavaQuarkusSpring Boot
Related products:
Red Hat build of Quarkus

Share:

    In typical data warehousing systems, data is first accumulated and then processed. But with the advent of new technologies, it is now possible to process data as and when it arrives. We call this real-time data processing. In real-time processing, data streams through pipelines; i.e., moving from one system to another. Data gets generated from static sources (like databases) or real-time systems (like transactional applications), and then gets filtered, transformed, and finally stored in a database or pushed to several other systems for further processing. The other systems can then follow the same cycle—i.e., filter, transform, store, or push to other systems.

    In this article, we will build a Quarkus application that streams and processes data in real-time using Kafka Streams. As we go through the example, you will learn how to apply Kafka concepts such as joins, windows, processors, state stores, punctuators, and interactive queries. By the end of the article, you will have the architecture for a realistic data streaming pipeline in Quarkus.

    The traditional messaging system

    As developers, we are tasked with updating a message-processing system that was originally built using a relational database and a traditional message broker. Here's the data flow for the messaging system:

    1. Data from two different systems arrives in two different messaging queues. Each record in one queue has a corresponding record in the other queue. Each record has a unique key.
    2. When a data record arrives in one of the message queues, the system uses the record's unique key to determine whether the database already has an entry for that record. If it does not find a record with that unique key, the system inserts the record into the database for processing.
    3. If the same data record arrives in the second queue within a few seconds, the application triggers the same logic. It checks whether a record with the same key is present in the database. If the record is present, the application retrieves the data and processes the two data objects.
    4. If the data record doesn't arrive in the second queue within 50 seconds after arriving in the first queue, then another application processes the record in the database.

    As you might imagine, this scenario worked well before the advent of data streaming, but it does not work so well today.

    The data streaming pipeline

    Our task is to build a new message system that executes data streaming operations with Kafka. This type of application is capable of processing data in real-time, and it eliminates the need to maintain a database for unprocessed records. Figure 1 illustrates the data flow for the new application:

    A flow diagram of the data-streaming pipeline's architecture.
    Figure 1: Architecture of the data streaming pipeline.

    In the next sections, we'll go through the process of building a data streaming pipeline with Kafka Streams in Quarkus. You can get the complete source code from the article's GitHub repository. Before we start coding the architecture, let's discuss joins and windows in Kafka Streams.

    Joins and windows in Kafka Streams

    Kafka allows you to join records that arrive on two different topics. You are probably familiar with the concept of joins in a relational database, where the data is static and available in two tables. In Kafka, joins work differently because the data is always streaming.

    We'll look at the types of joins in a moment, but the first thing to note is that joins happen for data collected over a duration of time. Kafka calls this type of collection windowing. Various types of windows are available in Kafka. For our example, we will use a tumbling window.

    Inner joins

    Now, let's consider how an inner join works. Assume that two separate data streams arrive in two different Kafka topics, which we will call the left and right topics. A record arriving in one topic has another relevant record (with the same key but a different value) that is also arriving in the other topic. The second record arrives after a brief time delay. As shown in Figure 2, we create a Kafka stream for each of the topics.

    A diagram of an inner join for two topics.
    Figure 2: Diagram of an inner join.

    The inner join on the left and right streams creates a new data stream. When it finds a matching record (with the same key) on both the left and right streams, Kafka emits a new record at time t2 in the new stream. Because the B record did not arrive on the right stream within the specified time window, Kafka Streams won't emit a new record for B.

    Outer joins

    Next, let's look at how an outer join works. Figure 3 shows the data flow for the outer join in our example:

    A diagram of an outer join.
    Figure 3: Diagram of an outer join.

    If we don't use the "group by" clause when we join two streams in Kafka Streams, then the join operation will emit three records. Streams in Kafka do not wait for the entire window; instead, they start emitting records whenever the condition for an outer join is true. So, when Record A on the left stream arrives at time t1, the join operation immediately emits a new record. At time t2, the outerjoin Kafka stream receives data from the right stream. The join operation immediately emits another record with the values from both the left and right records.

    You would see different outputs if you used the groupBy and reduce functions on these Kafka streams. In that case, the streams would wait for the window to complete the duration, perform the join, and then emit the data, as previously shown in Figure 3.

    Understanding how inner and outer joins work in Kafka Streams helps us find the best way to implement the data flow that we want. In this case, it is clear that we need to perform an outer join. This type of join allows us to retrieve records that appear in both the left and right topics, as well as records that appear in only one of them.

    With that background out of the way, let's begin building our Kafka-based data streaming pipeline.

    Note: We can use Quarkus extensions for Spring Web and Spring DI (dependency injection) to code in the Spring Boot style using Spring-based annotations.

    Step 1: Perform the outer join

    To perform the outer join, we first create a class called KafkaStreaming, then add the function startStreamStreamOuterJoin():

    @RestController
    public class KafkaStreaming {
        private KafkaStreams streamsOuterJoin;
        private final String LEFT_STREAM_TOPIC = "left-stream-topic";
        private final String RIGHT_STREAM_TOPIC = "right-stream-topic";
        private final String OUTER_JOIN_STREAM_OUT_TOPIC = "stream-stream-outerjoin";
        private final String PROCESSED_STREAM_OUT_TOPIC = "processed-topic";
    
        private final String KAFKA_APP_ID = "outerjoin";
        private final String KAFKA_SERVER_NAME = "localhost:9092";
    
    
        @RequestMapping("/startstream/")
        public void startStreamStreamOuterJoin() {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, KAFKA_APP_ID);
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_NAME);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
            final StreamsBuilder builder = new StreamsBuilder();
    
            KStream<String, String> leftSource = builder.stream(LEFT_STREAM_TOPIC);
            KStream<String, String> rightSource = builder.stream(RIGHT_STREAM_TOPIC);
    
            // TODO 1 - Add state store
    
            // do the outer join
            // change the value to be a mix of both streams value
            // have a moving window of 5 seconds
            // output the last value received for a specific key during the window
            // push the data to OUTER_JOIN_STREAM_OUT_TOPIC topic
    
            leftSource.outerJoin(rightSource,
                                (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
                                JoinWindows.of(Duration.ofSeconds(5)))
                            .groupByKey()
                            .reduce(((key, lastValue) -> lastValue))
                            .toStream()
                            .to(OUTER_JOIN_STREAM_OUT_TOPIC);
    
            // build the streams topology
            final Topology topology = builder.build();
    
            // TODO - 2: Add processor code later
            streamsOuterJoin = new KafkaStreams(topology, props);
            streamsOuterJoin.start();
        }
    }
    

    When we do a join, we create a new value that combines the data in the left and right topics. If any record with a key is missing in the left or right topic, then the new value will have the string null as the value for the missing record. Also, the Kafka Stream reduce function returns the last-aggregated value for all of the keys.

    Note: The TODO 1 - Add state store and TODO - 2:  Add processor code later comments are placeholders for code that we will add in the upcoming sections.

    The data flow so far

    Figure 4 illustrates the following data flow:

    1. When a record with key A and value V1 comes into the left stream at time t1, Kafka Streams applies an outer join operation. At this point, the application creates a new record with key A and the value left=V1, right=null.
    2. When a record with key A and value V2 arrives in the right topic, Kafka Streams again applies an outer join operation. This creates a new record with key A and the value left=V1, right=V2.
    3. When the reduce function is evaluated at the end of the duration window, the Kafka Streams API emits the last value that was computed, per the unique record key. In this case, it emits a record with key A and a value of left=V1, right=V2 into the new stream.
    4. The new stream pushes the record to the outerjoin topic.
    A diagram of the data streaming pipeline.

    Figure 4: The data streaming pipeline so far.">

    Next, we will add the state store and processor code.

    Step 2: Add the Kafka Streams processor

    We need to process the records that are being pushed to the outerjoin topic by the outer join operation. Kafka Streams provides a Processor API that we can use to write custom logic for record processing. To start, we define a custom processor, DataProcessor, and add it to the streams topology in the KafkaStreaming class:

    public class DataProcessor implements Processor<String, String>{
        private ProcessorContext context;
    
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }
    
        @Override
        public void process(String key, String value) {
            if(value.contains("null")) {
                // TODO 3: - let's process later
            } else {
                processRecord(key, value);
    
                //forward the processed data to processed-topic topic
                context.forward(key, value);
            }
            context.commit();
        }
    
        @Override
        public void close() {
        }
    
        private void processRecord (String key, String value) {
            // your own custom logic. I just print
            System.out.println("==== Record Processed ==== key: "+key+" and value: "+value);
    
        }
    }
    

    The record is processed, and if the value does not contain a null string, it is forwarded to the sink topic (that is, the processed-topic topic). In the bolded parts of the KafkaStreaming class below, we wire the topology to define the source topic (i.e., the outerjoin topic), add the processor, and finally add a sink (i.e., the processed-topic topic). Once it's done, we can add this piece of code to the TODO - 2: Add processor code later section of the KafkaStreaming class:

    // add another stream that reads data from OUTER_JOIN_STREAM_OUT_TOPIC topic
                topology.addSource("Source", OUTER_JOIN_STREAM_OUT_TOPIC);
    
                // add a processor to the stream so that each record is processed
                topology.addProcessor("StateProcessor",
                                    new ProcessorSupplier<String, String>()
                                            { public Processor<String, String> get() {
                                                return new DataProcessor();
                                            }},
                                    "Source");
    
                topology.addSink("Sink", PROCESSED_STREAM_OUT_TOPIC, "StateProcessor");
    

    Note that all we do is to define the source topic (the outerjoin topic), add an instance of our custom processor class, and then add the sink topic (the processed-topic topic). The context.forward() method in the custom processor sends the record to the sink topic.

    Figure 5 shows the architecture that we have built so far.

    A diagram of the architecture in progress.
    Figure 5: The architecture with the Kafka Streams processor added.

    Step 3: Add the punctuator and StateStore

    If you looked closely at the DataProcessor class, you probably noticed that we are only processing records that have both of the required (left-stream and right-stream) key values. We also need to process records that have just one of the values, but we want to introduce a delay before processing these records. In some cases, the other value will arrive in a later time window, and we don't want to process the records prematurely.

    State store

    In order to delay processing, we need to hold incoming records in a store of some kind, rather than an external database. Kafka Streams lets us store data in a state store. We can use this type of store to hold recently received input records, track rolling aggregates, de-duplicate input records, and more.

    Punctuators

    Once we start holding records that have a missing value from either topic in a state store, we can use punctuators to process them. As an example, we could add a punctuator function to a processorcontext.schedule() method. We can set the schedule to call the punctuate() method.

    Add the state store

    Adding the following code to the KafkaStreaming class adds a state store. Place this code where you see the TODO 1 - Add state store comment in the KafkaStreaming class:

            // build the state store that will eventually store all unprocessed items
            Map<String, String> changelogConfig = newHashMap<>();
    
            StoreBuilder<KeyValueStore<String, String>> stateStore = Stores.keyValueStoreBuilder(
                                                                        Stores.persistentKeyValueStore(STORE_NAME),
                                                                        Serdes.String(),
                                                                        Serdes.String())
                                                                     .withLoggingEnabled(changelogConfig);
            .....
            .....
            .....
            .....
            // add the state store in the topology builder
            topology.addStateStore(stateStore, "StateProcessor");
    

    We have defined a state store that stores the key and value as a string. We've also enabled logging, which is useful if the application dies and restarts. In that case, the state store won't lose data.

    We'll modify the processor's process() to put records with a missing value from either topic in the state store for later processing. Place the following code where you see the comment TODO 3 - let's process later in the KafkaStreaming class:

              if(value.contains("null")) {
                if (kvStore.get(key) != null) {
                    // this means that the other value arrived first
                    // you have both the values now and can process the record
                    String newvalue = value.concat(" ").concat(kvStore.get(key));
                    process(key, newvalue);
    
                    // remove the entry from the statestore (if any left or right record came first as an event)
                    kvStore.delete(key);
                    context.forward(key, newvalue);
                } else {
    
                    // add to state store as either left or right data is missing
                    System.out.println("Incomplete value: "+value+" detected. Putting into statestore for later processing");
                    kvStore.put(key, value);
                }
            }
    

    Add the punctuator

    Next, we add the punctuator to the custom processor we've just created. For this, we update the DataProcessor's init() method to the following:

        private KeyValueStore<String, String> kvStore;
    
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            kvStore = (KeyValueStore) context.getStateStore(STORE_NAME);
    
            // schedule a punctuate() method every 50 seconds based on stream-time
            this.context.schedule(Duration.ofSeconds(50), PunctuationType.WALL_CLOCK_TIME,
                                    new Punctuator(){
                                        @Override
                                        public void punctuate(long timestamp) {
                                            System.out.println("Scheduled punctuator called at "+timestamp);
                                            KeyValueIterator<String, String> iter = kvStore.all();
                                            while (iter.hasNext()) {
                                                KeyValue<String, String> entry = iter.next();
                                                System.out.println("  Processed key: "+entry.key+" and value: "+entry.value+" and sending to processed-topic topic");
                                                context.forward(entry.key, entry.value.toString());
                                                kvStore.put(entry.key, null);
                                            }
                                            iter.close();
                                        // commit the current processing progress
                                        context.commit();
                                        }
                                    }
            );
        }
    

    We've set the punctuate logic to be invoked every 50 seconds. The code retrieves entries in the state store and processes them. The forward() function then sends the processed record to the processed-topic topic. Lastly, we delete the record from the state store.

    Figure 6 shows the complete data streaming architecture:

    A diagram of the complete application with the state store and punctuators added.
    Figure 6: The complete data streaming pipeline.

    Interactive queries

    We are finished with the basic data streaming pipeline, but what if we wanted to be able to query the state store? In this case, we could use interactive queries in the Kafka Streams API to make the application queryable. See the article's GitHub repository for more about interactive queries in Kafka Streams.

    Summary

    You can use the streaming pipeline that we developed in this article to do any of the following:

    • Process records in real-time.
    • Store data without depending on a database or cache.
    • Build a modern, event-driven architecture.

    I hope the example application and instructions will help you with building and processing data streaming pipelines. You can get the source code for the example application from this article's GitHub repository.

    Last updated: January 12, 2024

    Recent Posts

    • More Essential AI tutorials for Node.js Developers

    • How to run a fraud detection AI model on RHEL CVMs

    • How we use software provenance at Red Hat

    • Alternatives to creating bootc images from scratch

    • How to update OpenStack Services on OpenShift

    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue