Skip to main content
Redhat Developers  Logo
  • AI

    Get started with AI

    • Red Hat AI
      Accelerate the development and deployment of enterprise AI solutions.
    • AI learning hub
      Explore learning materials and tools, organized by task.
    • AI interactive demos
      Click through scenarios with Red Hat AI, including training LLMs and more.
    • AI/ML learning paths
      Expand your OpenShift AI knowledge using these learning resources.
    • AI quickstarts
      Focused AI use cases designed for fast deployment on Red Hat AI platforms.
    • No-cost AI training
      Foundational Red Hat AI training.

    Featured resources

    • OpenShift AI learning
    • Open source AI for developers
    • AI product application development
    • Open source-powered AI/ML for hybrid cloud
    • AI and Node.js cheat sheet

    Red Hat AI Factory with NVIDIA

    • Red Hat AI Factory with NVIDIA is a co-engineered, enterprise-grade AI solution for building, deploying, and managing AI at scale across hybrid cloud environments.
    • Explore the solution
  • Learn

    Self-guided

    • Documentation
      Find answers, get step-by-step guidance, and learn how to use Red Hat products.
    • Learning paths
      Explore curated walkthroughs for common development tasks.
    • Guided learning
      Receive custom learning paths powered by our AI assistant.
    • See all learning

    Hands-on

    • Developer Sandbox
      Spin up Red Hat's products and technologies without setup or configuration.
    • Interactive labs
      Learn by doing in these hands-on, browser-based experiences.
    • Interactive demos
      Click through product features in these guided tours.

    Browse by topic

    • AI/ML
    • Automation
    • Java
    • Kubernetes
    • Linux
    • See all topics

    Training & certifications

    • Courses and exams
    • Certifications
    • Skills assessments
    • Red Hat Academy
    • Learning subscription
    • Explore training
  • Build

    Get started

    • Red Hat build of Podman Desktop
      A downloadable, local development hub to experiment with our products and builds.
    • Developer Sandbox
      Spin up Red Hat's products and technologies without setup or configuration.

    Download products

    • Access product downloads to start building and testing right away.
    • Red Hat Enterprise Linux
    • Red Hat AI
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform
    • See all products

    Featured

    • Red Hat build of OpenJDK
    • Red Hat JBoss Enterprise Application Platform
    • Red Hat OpenShift Dev Spaces
    • Red Hat Developer Toolset

    References

    • E-books
    • Documentation
    • Cheat sheets
    • Architecture center
  • Community

    Get involved

    • Events
    • Live AI events
    • Red Hat Summit
    • Red Hat Accelerators
    • Community discussions

    Follow along

    • Articles & blogs
    • Developer newsletter
    • Videos
    • Github

    Get help

    • Customer service
    • Customer support
    • Regional contacts
    • Find a partner

    Join the Red Hat Developer program

    • Download Red Hat products and project builds, access support documentation, learning content, and more.
    • Explore the benefits

Stream processing with ksqlDB in OpenShift Streams for Apache Kafka

December 20, 2022
Daniele Martinoli Sandip Gahlot
Related topics:
Kafka
Related products:
Red Hat OpenShift

    This is the first article in a three-part regarding existing solutions to manipulate Kafka stream data with SQL, the well-known query language widely used to manipulate relational databases.

    Apache Kafka

    Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale. Originally created to handle real-time data feeds at LinkedIn in 2011, Kafka quickly evolved from messaging queue to a full-fledged event streaming platform capable of handling over 1 million messages per second, or trillions of messages per day.

    OpenShift offers many options to install an Apache Kafka cluster, including various operators, but in this article, we'll focus on the creation of managed Apache Kafka services using Red Hat OpenShift Streams for Apache Kafka, which also provides an option to try out a free instance that we will use for the purposes of our exercise.

    ksqlDB

    ksqlDB is a real-time event-streaming database built on top of Apache Kafka. It combines powerful stream processing with a relational database model using SQL syntax. It can connect multiple Apache Kafka instances at the same time, providing an alternative to manipulate the data streams with almost no code.

    ksqlDB Standalone is an open source project that's licensed under the Confluent Community License.

    The main use cases for adopting ksqlDB are the following:

    • Materialized caches
    • Streaming ETL pipelines
    • Event-driven microservices

    ksqlDB also provides commands to manage connectors based on the Kafka Connect technology to easily integrate the messaging system with external sources or sinks, like relational or NoSQL databases, key-value stores, search indexes, and file systems among others.

    In terms of management interface, ksqlDB provides both a command-line interface (CLI) to run SQL-like commands and a REST API to run the same operations from an HTTP client.

    Introducing the sample application

    This article shows how to install, configure, and operate an application made of the following components:

    • An instance of the Red Hat OpenShift Streams for Apache Kafka managed service called kafka-orders
    • Three topics to model: customers, items and orders
    • A ksqlDB server connected to the to kafka-orders instance and its CLI console
    • A SQL script to populate topics with sample SQL INSERTs
    • An aggregated stream aggregated-orders defined with a push query sending messages to the external subscribers
    • A Quarkus receiver application subscribed to receive and print the messages from the new stream
    • neworders.sh, a bash script to inject new orders

    Figure 1 shows the architecture of the sample demo application.

    Architecture of the sample demo application
    Figure 1: The sample demo application is made of a managed Apache Kafka instance, the ksqlDB server and CLI, a Quarkus application, and a script to generate random orders.

    Install and run the demo

    To run the demo in this article, you need the following tools:

    • rhoas CLI
    • Maven 3.x
    • Java SDK 11+
    • bash terminal session

    You also need accounts on the following systems:

    • Red Hat OpenShift Application and Data Services like https://sandbox.redhat.com

    To run the exercise, clone the repository of the source code and define an environment variable to easily reference the root folder:

    git clone git@github.com:dmartinol/ksqldb-on-rhoas.git
    cd ksqldb-on-rhoas
    export DEMO_HOME=$PWD

    Create the Kafka instance

    You need to request a login token by visiting the OpenShift Cluster Manager API Token, then replace the TOKEN parameter with the given token and login to the rhoas CLI:

    rhoas login --api-gateway https://api.openshift.com -t "<TOKEN>"
    

    The first step is to create a managed Kafka instance called kafka-orders, along with the required topics items, customers and orders, then set up the access rules to grant access to a service account called kafka-orders-quarkus for topics whose name starts by ksql-topic:

    ./createKafka.sh

    Once the creation completes (it might take up to five minutes), you can check the configuration by looking at the content of the following files:

    • kafka-orders.json: the configuration of the Kafka host
    • kafka-orders-quarkus.json: the configuration of the service account

    Alternatively, you can review the same details in the Kafka Instances management console, together with other useful information displayed in the Topics and Access tabs, as shown in Figure 2.

    The Kafka Instances console shows all the details of the newly created Kafka instance.
    Figure 2. Select Kafka Instances from the Streams for Apache Kafka menu and then the kafka-orders instance to display all the details of the newly created Kafka instance.

    The checkKafkaStatus.sh script also provides an option to print the same details on the terminal console.

    cd $DEMO_HOME
    ./checkKafkaStatus.sh

    Install ksqlDB

    We will run the ksqlDB tool from the source code, so we need to clone the GitHub repository and build one of the stable branches, like v7.2.2:

    git clone https://github.com/confluentinc/ksql.git
    cd ksql
    export KSQL_HOME=${PWD}
    git checkout v7.2.2
    mvn clean install

    Other installation options are described in the ksqlDB documentation page.

    Configure the ksqlDB service account

    ksqlDB must be allowed to connect and manage the Kafka resources, so we create a specific service account, ksqldb-sa, and configure the access rules to let ksqlDB access and manage the required resources:

    cd $DEMO_HOME/ksqldb
    ./createSaForKsql.sh

    A complete description of the required access rules is provided in the Required ACLs section of the ksqlDB documentation.

    Run ksqlDB server and CLI

    The next step is to launch the ksqlDB server on the default port 8088 and connect it to the Kafka instance created before:

    cd $DEMO_HOME/ksqldb
    ./startKsql.sh

    On a different terminal, we can start the ksqlDB CLI and populate the topics with sample data contained in the file init.sql:

    > cd $DEMO_HOME/ksqldb
    > ./startKsqlCli.sh
    ...
                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    Copyright 2017-2022 Confluent Inc.
    CLI v7.2.2, Server v7.2.2 located at http://localhost:8088
    Server Status: RUNNING
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
    ksql> RUN SCRIPT init.sql;
    

    From the ksqlDB CLI, we can use some SQL commands to validate the content of the ksqDB, like SHOW TOPICS, SHOW STREAMS, SELECT * FROM ORDERS, DESCRIBE ORDERS.

    ksql> SHOW TOPICS;
     Kafka Topic                      | Partitions | Partition Replicas 
    --------------------------------------------------------------------
     customers                        | 1          | 1                  
     items                            | 1          | 1                  
     ksql-service-ksql_processing_log | 1          | 1                  
     orders                           | 1          | 1                  
    --------------------------------------------------------------------
    ksql> SHOW STREAMS;
     Stream Name         | Kafka Topic                      | Key Format | Value Format | Windowed 
    -----------------------------------------------------------------------------------------------
     KSQL_PROCESSING_LOG | ksql-service-ksql_processing_log | KAFKA      | JSON         | false    
     ORDERS              | orders                           | KAFKA      | JSON         | false    
    -----------------------------------------------------------------------------------------------
    ksql> DESCRIBE ORDERS;
    Name                 : ORDERS
     Field      | Type            
    ------------------------------
     ITEMID     | VARCHAR(STRING) 
     CUSTOMERID | VARCHAR(STRING) 
     PRICE      | DOUBLE          
    ------------------------------
    For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
    ksql> SELECT * FROM ORDERS;
    +----------------------------------------------------------------------+----------------------------------------------------------------------+----------------------------------------------------------------------+
    |ITEMID                                                                |CUSTOMERID                                                            |PRICE                                                                 |
    +----------------------------------------------------------------------+----------------------------------------------------------------------+----------------------------------------------------------------------+
    |A1                                                                    |C1                                                                    |5.8                                                                   |
    |A1                                                                    |C2                                                                    |6.0                                                                   |
    |A1                                                                    |C3                                                                    |5.0                                                                   |
    |A2                                                                    |C1                                                                    |123.3                                                                 |
    |A2                                                                    |C1                                                                    |120.3                                                                 |
    Query Completed
    Query terminated
    

    For a list of available SQL commands, you can read the SQL quick reference page.

    Creating a merged stream and an aggregated table

    Using the ksqlDB CLI, we create an aggregated stream called FULL_ORDERS that contains one message for each of the stored orders and the aggregated table AGGREGATED_ORDERS built on the FULL_ORDERS stream to calculate the total cost of all the orders for each customer:

    ksql> CREATE STREAM FULL_ORDERS AS
      SELECT customers.customerId, customers.name, items.itemId, items.item, orders.price
      FROM orders
      JOIN customers  ON orders.customerId = customers.customerId
      JOIN items  ON items.itemId = orders.itemId 
      EMIT CHANGES;
    ksql> CREATE TABLE AGGREGATED_ORDERS AS
      SELECT CUSTOMERS_CUSTOMERID, count(*) as noOfItems, sum(price) as totalPrice
      FROM FULL_ORDERS
      GROUP BY CUSTOMERS_CUSTOMERID
      EMIT CHANGES;

    The ksqlDB documentation describes the difference between streams and tables: the first are immutable collections that represent a series of historical facts, while the latter are mutable collections that change over time, defining the current status of any entity identified by its key.

    The execution of the SHOW TOPICS and SHOW QUERIES commands describe the effects of the above commands on the ksqlDB database.

    ksql> SHOW TOPICS;
     Kafka Topic                      | Partitions | Partition Replicas 
    --------------------------------------------------------------------
     customers                        | 1          | 1                  
     items                            | 1          | 1                  
     ksql-service-ksql_processing_log | 1          | 1                  
     ksql-topic-AGGREGATED_ORDERS     | 1          | 1                  
     ksql-topic-FULL_ORDERS           | 1          | 1                  
     orders                           | 1          | 1                  
    --------------------------------------------------------------------
    ksql> SHOW QUERIES;
     Query ID                 | ...                                                                                                                                                                                                                                                                                                                                                                                                                  
    -------------------------------
     CSAS_FULL_ORDERS_7       | ... 
     CTAS_AGGREGATED_ORDERS_9 | ...
    -------------------------------
    For detailed information on a Query run: EXPLAIN <Query ID>;
    

    Note that all the newly created topics are prefixed by ksql-topic-, which is the configuration that we provided in the startKsql.sh script.

    The Quarkus receiver application

    In the receiver folder of the demo source code, we developed a Quarkus application that receives messages from the newly created stream AGGREGATED_ORDERS and prints the content to the console. Using the @Incoming annotation, we can specify the Kafka topic as the data source:

    @ApplicationScoped
    public class AggregatedOrderReceiver {
        private Logger log = Logger.getLogger(AggregatedOrderReceiver.class);
        @Incoming("get-aggregated-order")
        @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
        public void process(ConsumerRecord<String, AggregatedOrder> record) {
            record.value().customerId = record.key();
            log.infof("AggregatedOrderReceiver received %s", record.value());
        }
    }
    

    The topic configuration is specified in the application.properties file:

    # Configure the Kafka source (we read from it)
    mp.messaging.incoming.get-aggregated-order.topic=ksql-topic-AGGREGATED_ORDERS
    mp.messaging.incoming.get-aggregated-order.group.id=${KAFKA_CONSUMER_GROUP}
    mp.messaging.incoming.get-aggregated-order.value.deserializer=org.acme.kafka.AggregatedOrderDeserializer
    

    Launch the Quarkus receiver application to connect to the Kafka cluster and receive updates on the ksql-topic-AGGREGATED-ORDERS topic:

    cd $DEMO_HOME/receiver
    ./startReceiver.sh

    Testing the integration

    To validate the integration, we will run a push query to receive updates from the AGGREGATED-ORDERS table:

    ksql> SELECT * FROM AGGREGATED_ORDERS EMIT CHANGES;
    +-----------------------+-----------------+----------------+
    |CUSTOMERS_CUSTOMERID   |NOOFITEMS        |TOTALPRICE      |
    +-----------------------+-----------------+----------------+
    Press CTRL-C to interrupt
    

    The query includes the option EMIT CHANGES to make it a push query that produces a continuously updating stream of the latest messages, a never-ending query that must be explicitly terminated using the TERMINATE command in a new ksqlDB CLI session:

    ksql> SHOW QUERIES;
    ksql> TERMINATE <QUERY ID>;
    

    You can read more about the difference between push and pull queries on the ksqlDB documentation page.

    At the same time, we use the neworder.sh script available in the ksqldb folder to push some new random messages in the orders topic.

    cd $DEMO_HOME/ksqldb
    ./neworder.sh -o 10

    The animation in Figure 3 shows how the messages created in the neworders tab of the screen are displayed in the ksqlDB CLI running the SELECT statement (left screen of receivers tab) and on the Quarkus receiver application on the right screen of the receivers tab:

    The SELECT query and the Quarkus receiver application receive new messages from the aggregated topic as soon as new orders are injected with the neworder.sh script
    Figure 3: The SELECT query and the Quarkus receiver application receive new messages from the aggregated topic as soon as new orders are injected with the neworder.sh script.

    Tear down the managed resources

    To clear the setup, we can delete all the resources created for the purpose of the demo:

    cd $DEMO_HOME/ksqldb
    ./deleteAll.sh
    cd ..
    ./deleteAll.sh

    ksqlDB to process streams of Kafka messages

    By using ksqlDB and the familiar SQL command syntax, we processed the messages in an Apache Kafka cluster and generated new aggregated messages using standard SQL aggregation functions. We injected new messages on the topics and watched the live streams of events coming in real time from a push query in the ksqlDB CLI and from a message stream in a Quarkus application.

    No code was needed to process the streams, apart from a few SQL commands. The procedures presented in the demo are available in GitHub.

    The rhoas CLI was used to interact with the Red Hat OpenShift Application and Data Services and hide the complexity of creating the Kafka instance and defining the access rules for both the Quarkus application and the ksqlDB server.

    Last updated: August 27, 2025

    Recent Posts

    • Red Hat Enterprise Linux 10.2 and 9.8: Top features for developers

    • What GPU kernels mean for your distributed inference

    • Debugging image mode with Red Hat OpenShift 4.20: A practical guide

    • EvalHub: Because "looks good to me" isn't a benchmark

    • SQL Server HA on RHEL: Meet Pacemaker HA Agent v2 (tech preview)

    What’s up next?

    Get started learning how to develop with OpenShift Streams for Apache Kafka. In this learning path, you’ll sign up for a free Red Hat account, provision a managed Kafka instance, and connect to it using service account credentials via SSL.

    Learn by doing
    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Platforms

    • Red Hat AI
    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform
    • See all products

    Build

    • Developer Sandbox
    • Developer tools
    • Interactive tutorials
    • API catalog

    Quicklinks

    • Learning resources
    • E-books
    • Cheat sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site status dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit
    © 2026 Red Hat

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Chat Support

    Please log in with your Red Hat account to access chat support.