Skip to main content
Redhat Developers  Logo
  • Products

    Featured

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat OpenShift AI
      Red Hat OpenShift AI
    • Red Hat Enterprise Linux AI
      Linux icon inside of a brain
    • Image mode for Red Hat Enterprise Linux
      RHEL image mode
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • Red Hat Developer Hub
      Developer Hub
    • View All Red Hat Products
    • Linux

      • Red Hat Enterprise Linux
      • Image mode for Red Hat Enterprise Linux
      • Red Hat Universal Base Images (UBI)
    • Java runtimes & frameworks

      • JBoss Enterprise Application Platform
      • Red Hat build of OpenJDK
    • Kubernetes

      • Red Hat OpenShift
      • Microsoft Azure Red Hat OpenShift
      • Red Hat OpenShift Virtualization
      • Red Hat OpenShift Lightspeed
    • Integration & App Connectivity

      • Red Hat Build of Apache Camel
      • Red Hat Service Interconnect
      • Red Hat Connectivity Link
    • AI/ML

      • Red Hat OpenShift AI
      • Red Hat Enterprise Linux AI
    • Automation

      • Red Hat Ansible Automation Platform
      • Red Hat Ansible Lightspeed
    • Developer tools

      • Red Hat Trusted Software Supply Chain
      • Podman Desktop
      • Red Hat OpenShift Dev Spaces
    • Developer Sandbox

      Developer Sandbox
      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Secure Development & Architectures

      • Security
      • Secure coding
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
      • View All Technologies
    • Start exploring in the Developer Sandbox for free

      sandbox graphic
      Try Red Hat's products and technologies without setup or configuration.
    • Try at no cost
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • Java
      Java icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • API Catalog
    • Product Documentation
    • Legacy Documentation
    • Red Hat Learning

      Learning image
      Boost your technical skills to expert-level with the help of interactive lessons offered by various Red Hat Learning programs.
    • Explore Red Hat Learning
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

Stream processing with ksqlDB in OpenShift Streams for Apache Kafka

December 20, 2022
Daniele Martinoli Sandip Gahlot
Related topics:
Kafka
Related products:
Red Hat OpenShift

Share:

    This is the first article in a three-part regarding existing solutions to manipulate Kafka stream data with SQL, the well-known query language widely used to manipulate relational databases.

    Apache Kafka

    Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale. Originally created to handle real-time data feeds at LinkedIn in 2011, Kafka quickly evolved from messaging queue to a full-fledged event streaming platform capable of handling over 1 million messages per second, or trillions of messages per day.

    OpenShift offers many options to install an Apache Kafka cluster, including various operators, but in this article, we'll focus on the creation of managed Apache Kafka services using Red Hat OpenShift Streams for Apache Kafka, which also provides an option to try out a free instance that we will use for the purposes of our exercise.

    ksqlDB

    ksqlDB is a real-time event-streaming database built on top of Apache Kafka. It combines powerful stream processing with a relational database model using SQL syntax. It can connect multiple Apache Kafka instances at the same time, providing an alternative to manipulate the data streams with almost no code.

    ksqlDB Standalone is an open source project that's licensed under the Confluent Community License.

    The main use cases for adopting ksqlDB are the following:

    • Materialized caches
    • Streaming ETL pipelines
    • Event-driven microservices

    ksqlDB also provides commands to manage connectors based on the Kafka Connect technology to easily integrate the messaging system with external sources or sinks, like relational or NoSQL databases, key-value stores, search indexes, and file systems among others.

    In terms of management interface, ksqlDB provides both a command-line interface (CLI) to run SQL-like commands and a REST API to run the same operations from an HTTP client.

    Introducing the sample application

    This article shows how to install, configure, and operate an application made of the following components:

    • An instance of the Red Hat OpenShift Streams for Apache Kafka managed service called kafka-orders
    • Three topics to model: customers, items and orders
    • A ksqlDB server connected to the to kafka-orders instance and its CLI console
    • A SQL script to populate topics with sample SQL INSERTs
    • An aggregated stream aggregated-orders defined with a push query sending messages to the external subscribers
    • A Quarkus receiver application subscribed to receive and print the messages from the new stream
    • neworders.sh, a bash script to inject new orders

    Figure 1 shows the architecture of the sample demo application.

    Architecture of the sample demo application
    Figure 1: The sample demo application is made of a managed Apache Kafka instance, the ksqlDB server and CLI, a Quarkus application, and a script to generate random orders.

    Install and run the demo

    To run the demo in this article, you need the following tools:

    • rhoas CLI
    • Maven 3.x
    • Java SDK 11+
    • bash terminal session

    You also need accounts on the following systems:

    • Red Hat OpenShift Application and Data Services like https://console.redhat.com

    To run the exercise, clone the repository of the source code and define an environment variable to easily reference the root folder:

    git clone git@github.com:dmartinol/ksqldb-on-rhoas.git
    cd ksqldb-on-rhoas
    export DEMO_HOME=$PWD

    Create the Kafka instance

    You need to request a login token by visiting the OpenShift Cluster Manager API Token, then replace the TOKEN parameter with the given token and login to the rhoas CLI:

    rhoas login --api-gateway https://api.openshift.com -t "<TOKEN>"
    

    The first step is to create a managed Kafka instance called kafka-orders, along with the required topics items, customers and orders, then set up the access rules to grant access to a service account called kafka-orders-quarkus for topics whose name starts by ksql-topic:

    ./createKafka.sh

    Once the creation completes (it might take up to five minutes), you can check the configuration by looking at the content of the following files:

    • kafka-orders.json: the configuration of the Kafka host
    • kafka-orders-quarkus.json: the configuration of the service account

    Alternatively, you can review the same details in the Kafka Instances management console, together with other useful information displayed in the Topics and Access tabs, as shown in Figure 2.

    The Kafka Instances console shows all the details of the newly created Kafka instance.
    Figure 2. Select Kafka Instances from the Streams for Apache Kafka menu and then the kafka-orders instance to display all the details of the newly created Kafka instance.

    The checkKafkaStatus.sh script also provides an option to print the same details on the terminal console.

    cd $DEMO_HOME
    ./checkKafkaStatus.sh

    Install ksqlDB

    We will run the ksqlDB tool from the source code, so we need to clone the GitHub repository and build one of the stable branches, like v7.2.2:

    git clone https://github.com/confluentinc/ksql.git
    cd ksql
    export KSQL_HOME=${PWD}
    git checkout v7.2.2
    mvn clean install

    Other installation options are described in the ksqlDB documentation page.

    Configure the ksqlDB service account

    ksqlDB must be allowed to connect and manage the Kafka resources, so we create a specific service account, ksqldb-sa, and configure the access rules to let ksqlDB access and manage the required resources:

    cd $DEMO_HOME/ksqldb
    ./createSaForKsql.sh

    A complete description of the required access rules is provided in the Required ACLs section of the ksqlDB documentation.

    Run ksqlDB server and CLI

    The next step is to launch the ksqlDB server on the default port 8088 and connect it to the Kafka instance created before:

    cd $DEMO_HOME/ksqldb
    ./startKsql.sh

    On a different terminal, we can start the ksqlDB CLI and populate the topics with sample data contained in the file init.sql:

    > cd $DEMO_HOME/ksqldb
    > ./startKsqlCli.sh
    ...
                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2022 Confluent Inc.
    
    CLI v7.2.2, Server v7.2.2 located at http://localhost:8088
    Server Status: RUNNING
    
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
    
    ksql> RUN SCRIPT init.sql;
    

    From the ksqlDB CLI, we can use some SQL commands to validate the content of the ksqDB, like SHOW TOPICS, SHOW STREAMS, SELECT * FROM ORDERS, DESCRIBE ORDERS.

    ksql> SHOW TOPICS;
    
     Kafka Topic                      | Partitions | Partition Replicas 
    --------------------------------------------------------------------
     customers                        | 1          | 1                  
     items                            | 1          | 1                  
     ksql-service-ksql_processing_log | 1          | 1                  
     orders                           | 1          | 1                  
    --------------------------------------------------------------------
    
    ksql> SHOW STREAMS;
    
     Stream Name         | Kafka Topic                      | Key Format | Value Format | Windowed 
    -----------------------------------------------------------------------------------------------
     KSQL_PROCESSING_LOG | ksql-service-ksql_processing_log | KAFKA      | JSON         | false    
     ORDERS              | orders                           | KAFKA      | JSON         | false    
    -----------------------------------------------------------------------------------------------
    
    ksql> DESCRIBE ORDERS;
    
    Name                 : ORDERS
     Field      | Type            
    ------------------------------
     ITEMID     | VARCHAR(STRING) 
     CUSTOMERID | VARCHAR(STRING) 
     PRICE      | DOUBLE          
    ------------------------------
    For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
    
    ksql> SELECT * FROM ORDERS;
    
    +----------------------------------------------------------------------+----------------------------------------------------------------------+----------------------------------------------------------------------+
    |ITEMID                                                                |CUSTOMERID                                                            |PRICE                                                                 |
    +----------------------------------------------------------------------+----------------------------------------------------------------------+----------------------------------------------------------------------+
    |A1                                                                    |C1                                                                    |5.8                                                                   |
    |A1                                                                    |C2                                                                    |6.0                                                                   |
    |A1                                                                    |C3                                                                    |5.0                                                                   |
    |A2                                                                    |C1                                                                    |123.3                                                                 |
    |A2                                                                    |C1                                                                    |120.3                                                                 |
    Query Completed
    Query terminated
    

    For a list of available SQL commands, you can read the SQL quick reference page.

    Creating a merged stream and an aggregated table

    Using the ksqlDB CLI, we create an aggregated stream called FULL_ORDERS that contains one message for each of the stored orders and the aggregated table AGGREGATED_ORDERS built on the FULL_ORDERS stream to calculate the total cost of all the orders for each customer:

    ksql> CREATE STREAM FULL_ORDERS AS
      SELECT customers.customerId, customers.name, items.itemId, items.item, orders.price
      FROM orders
      JOIN customers  ON orders.customerId = customers.customerId
      JOIN items  ON items.itemId = orders.itemId 
      EMIT CHANGES;
    
    ksql> CREATE TABLE AGGREGATED_ORDERS AS
      SELECT CUSTOMERS_CUSTOMERID, count(*) as noOfItems, sum(price) as totalPrice
      FROM FULL_ORDERS
      GROUP BY CUSTOMERS_CUSTOMERID
      EMIT CHANGES;

    The ksqlDB documentation describes the difference between streams and tables: the first are immutable collections that represent a series of historical facts, while the latter are mutable collections that change over time, defining the current status of any entity identified by its key.

    The execution of the SHOW TOPICS and SHOW QUERIES commands describe the effects of the above commands on the ksqlDB database.

    ksql> SHOW TOPICS;
    
     Kafka Topic                      | Partitions | Partition Replicas 
    --------------------------------------------------------------------
     customers                        | 1          | 1                  
     items                            | 1          | 1                  
     ksql-service-ksql_processing_log | 1          | 1                  
     ksql-topic-AGGREGATED_ORDERS     | 1          | 1                  
     ksql-topic-FULL_ORDERS           | 1          | 1                  
     orders                           | 1          | 1                  
    --------------------------------------------------------------------
    
    ksql> SHOW QUERIES;
    
     Query ID                 | ...                                                                                                                                                                                                                                                                                                                                                                                                                  
    -------------------------------
     CSAS_FULL_ORDERS_7       | ... 
     CTAS_AGGREGATED_ORDERS_9 | ...
    -------------------------------
    For detailed information on a Query run: EXPLAIN <Query ID>;
    

    Note that all the newly created topics are prefixed by ksql-topic-, which is the configuration that we provided in the startKsql.sh script.

    The Quarkus receiver application

    In the receiver folder of the demo source code, we developed a Quarkus application that receives messages from the newly created stream AGGREGATED_ORDERS and prints the content to the console. Using the @Incoming annotation, we can specify the Kafka topic as the data source:

    @ApplicationScoped
    public class AggregatedOrderReceiver {
        private Logger log = Logger.getLogger(AggregatedOrderReceiver.class);
    
        @Incoming("get-aggregated-order")
        @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
        public void process(ConsumerRecord<String, AggregatedOrder> record) {
            record.value().customerId = record.key();
            log.infof("AggregatedOrderReceiver received %s", record.value());
        }
    }
    

    The topic configuration is specified in the application.properties file:

    # Configure the Kafka source (we read from it)
    mp.messaging.incoming.get-aggregated-order.topic=ksql-topic-AGGREGATED_ORDERS
    mp.messaging.incoming.get-aggregated-order.group.id=${KAFKA_CONSUMER_GROUP}
    mp.messaging.incoming.get-aggregated-order.value.deserializer=org.acme.kafka.AggregatedOrderDeserializer
    

    Launch the Quarkus receiver application to connect to the Kafka cluster and receive updates on the ksql-topic-AGGREGATED-ORDERS topic:

    cd $DEMO_HOME/receiver
    ./startReceiver.sh

    Testing the integration

    To validate the integration, we will run a push query to receive updates from the AGGREGATED-ORDERS table:

    ksql> SELECT * FROM AGGREGATED_ORDERS EMIT CHANGES;
    +-----------------------+-----------------+----------------+
    |CUSTOMERS_CUSTOMERID   |NOOFITEMS        |TOTALPRICE      |
    +-----------------------+-----------------+----------------+
    
    Press CTRL-C to interrupt
    

    The query includes the option EMIT CHANGES to make it a push query that produces a continuously updating stream of the latest messages, a never-ending query that must be explicitly terminated using the TERMINATE command in a new ksqlDB CLI session:

    ksql> SHOW QUERIES;
    ksql> TERMINATE <QUERY ID>;
    

    You can read more about the difference between push and pull queries on the ksqlDB documentation page.

    At the same time, we use the neworder.sh script available in the ksqldb folder to push some new random messages in the orders topic.

    cd $DEMO_HOME/ksqldb
    ./neworder.sh -o 10

    The animation in Figure 3 shows how the messages created in the neworders tab of the screen are displayed in the ksqlDB CLI running the SELECT statement (left screen of receivers tab) and on the Quarkus receiver application on the right screen of the receivers tab:

    The SELECT query and the Quarkus receiver application receive new messages from the aggregated topic as soon as new orders are injected with the neworder.sh script
    Figure 3: The SELECT query and the Quarkus receiver application receive new messages from the aggregated topic as soon as new orders are injected with the neworder.sh script.

    Tear down the managed resources

    To clear the setup, we can delete all the resources created for the purpose of the demo:

    cd $DEMO_HOME/ksqldb
    ./deleteAll.sh
    cd ..
    ./deleteAll.sh

    ksqlDB to process streams of Kafka messages

    By using ksqlDB and the familiar SQL command syntax, we processed the messages in an Apache Kafka cluster and generated new aggregated messages using standard SQL aggregation functions. We injected new messages on the topics and watched the live streams of events coming in real time from a push query in the ksqlDB CLI and from a message stream in a Quarkus application.

    No code was needed to process the streams, apart from a few SQL commands. The procedures presented in the demo are available in GitHub.

    The rhoas CLI was used to interact with the Red Hat OpenShift Application and Data Services and hide the complexity of creating the Kafka instance and defining the access rules for both the Quarkus application and the ksqlDB server.

    Last updated: October 5, 2023

    Recent Posts

    • How to run AI models in cloud development environments

    • How Trilio secures OpenShift virtual machines and containers

    • How to implement observability with Node.js and Llama Stack

    • How to encrypt RHEL images for Azure confidential VMs

    • How to manage RHEL virtual machines with Podman Desktop

    What’s up next?

    Get started learning how to develop with OpenShift Streams for Apache Kafka. In this learning path, you’ll sign up for a free Red Hat account, provision a managed Kafka instance, and connect to it using service account credentials via SSL.

    Learn by doing
    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue