RHOSAK share feature image

This is the first article in a three-part regarding existing solutions to manipulate Kafka stream data with SQL, the well-known query language widely used to manipulate relational databases.

Apache Kafka

Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale. Originally created to handle real-time data feeds at LinkedIn in 2011, Kafka quickly evolved from messaging queue to a full-fledged event streaming platform capable of handling over 1 million messages per second, or trillions of messages per day.

OpenShift offers many options to install an Apache Kafka cluster, including various operators, but in this article, we'll focus on the creation of managed Apache Kafka services using Red Hat OpenShift Streams for Apache Kafka, which also provides an option to try out a free instance that we will use for the purposes of our exercise.

ksqlDB

ksqlDB is a real-time event-streaming database built on top of Apache Kafka. It combines powerful stream processing with a relational database model using SQL syntax. It can connect multiple Apache Kafka instances at the same time, providing an alternative to manipulate the data streams with almost no code.

ksqlDB Standalone is an open source project that's licensed under the Confluent Community License.

The main use cases for adopting ksqlDB are the following:

  • Materialized caches
  • Streaming ETL pipelines
  • Event-driven microservices

ksqlDB also provides commands to manage connectors based on the Kafka Connect technology to easily integrate the messaging system with external sources or sinks, like relational or NoSQL databases, key-value stores, search indexes, and file systems among others.

In terms of management interface, ksqlDB provides both a command-line interface (CLI) to run SQL-like commands and a REST API to run the same operations from an HTTP client.

Introducing the sample application

This article shows how to install, configure, and operate an application made of the following components:

  • An instance of the Red Hat OpenShift Streams for Apache Kafka managed service called kafka-orders
  • Three topics to model: customers, items and orders
  • A ksqlDB server connected to the to kafka-orders instance and its CLI console
  • A SQL script to populate topics with sample SQL INSERTs
  • An aggregated stream aggregated-orders defined with a push query sending messages to the external subscribers
  • A Quarkus receiver application subscribed to receive and print the messages from the new stream
  • neworders.sh, a bash script to inject new orders

Figure 1 shows the architecture of the sample demo application.

Architecture of the sample demo application
Figure 1: The sample demo application is made of a managed Apache Kafka instance, the ksqlDB server and CLI, a Quarkus application, and a script to generate random orders.

Install and run the demo

To run the demo in this article, you need the following tools:

  • rhoas CLI
  • Maven 3.x
  • Java SDK 11+
  • bash terminal session

You also need accounts on the following systems:

To run the exercise, clone the repository of the source code and define an environment variable to easily reference the root folder:

git clone git@github.com:dmartinol/ksqldb-on-rhoas.git
cd ksqldb-on-rhoas
export DEMO_HOME=$PWD

Create the Kafka instance

You need to request a login token by visiting the OpenShift Cluster Manager API Token, then replace the TOKEN parameter with the given token and login to the rhoas CLI:

rhoas login --api-gateway https://api.openshift.com -t "<TOKEN>"

The first step is to create a managed Kafka instance called kafka-orders, along with the required topics items, customers and orders, then set up the access rules to grant access to a service account called kafka-orders-quarkus for topics whose name starts by ksql-topic:

./createKafka.sh

Once the creation completes (it might take up to five minutes), you can check the configuration by looking at the content of the following files:

  • kafka-orders.json: the configuration of the Kafka host
  • kafka-orders-quarkus.json: the configuration of the service account

Alternatively, you can review the same details in the Kafka Instances management console, together with other useful information displayed in the Topics and Access tabs, as shown in Figure 2.

The Kafka Instances console shows all the details of the newly created Kafka instance.
Figure 2. Select Kafka Instances from the Streams for Apache Kafka menu and then the kafka-orders instance to display all the details of the newly created Kafka instance.

The checkKafkaStatus.sh script also provides an option to print the same details on the terminal console.

cd $DEMO_HOME
./checkKafkaStatus.sh

Install ksqlDB

We will run the ksqlDB tool from the source code, so we need to clone the GitHub repository and build one of the stable branches, like v7.2.2:

git clone https://github.com/confluentinc/ksql.git
cd ksql
export KSQL_HOME=${PWD}
git checkout v7.2.2
mvn clean install

Other installation options are described in the ksqlDB documentation page.

Configure the ksqlDB service account

ksqlDB must be allowed to connect and manage the Kafka resources, so we create a specific service account, ksqldb-sa, and configure the access rules to let ksqlDB access and manage the required resources:

cd $DEMO_HOME/ksqldb
./createSaForKsql.sh

A complete description of the required access rules is provided in the Required ACLs section of the ksqlDB documentation.

Run ksqlDB server and CLI

The next step is to launch the ksqlDB server on the default port 8088 and connect it to the Kafka instance created before:

cd $DEMO_HOME/ksqldb
./startKsql.sh

On a different terminal, we can start the ksqlDB CLI and populate the topics with sample data contained in the file init.sql:

> cd $DEMO_HOME/ksqldb
> ./startKsqlCli.sh
...
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2022 Confluent Inc.

CLI v7.2.2, Server v7.2.2 located at http://localhost:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> RUN SCRIPT init.sql;

From the ksqlDB CLI, we can use some SQL commands to validate the content of the ksqDB, like SHOW TOPICS, SHOW STREAMS, SELECT * FROM ORDERS, DESCRIBE ORDERS.

ksql> SHOW TOPICS;

 Kafka Topic                      | Partitions | Partition Replicas 
--------------------------------------------------------------------
 customers                        | 1          | 1                  
 items                            | 1          | 1                  
 ksql-service-ksql_processing_log | 1          | 1                  
 orders                           | 1          | 1                  
--------------------------------------------------------------------

ksql> SHOW STREAMS;

 Stream Name         | Kafka Topic                      | Key Format | Value Format | Windowed 
-----------------------------------------------------------------------------------------------
 KSQL_PROCESSING_LOG | ksql-service-ksql_processing_log | KAFKA      | JSON         | false    
 ORDERS              | orders                           | KAFKA      | JSON         | false    
-----------------------------------------------------------------------------------------------

ksql> DESCRIBE ORDERS;

Name                 : ORDERS
 Field      | Type            
------------------------------
 ITEMID     | VARCHAR(STRING) 
 CUSTOMERID | VARCHAR(STRING) 
 PRICE      | DOUBLE          
------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

ksql> SELECT * FROM ORDERS;

+----------------------------------------------------------------------+----------------------------------------------------------------------+----------------------------------------------------------------------+
|ITEMID                                                                |CUSTOMERID                                                            |PRICE                                                                 |
+----------------------------------------------------------------------+----------------------------------------------------------------------+----------------------------------------------------------------------+
|A1                                                                    |C1                                                                    |5.8                                                                   |
|A1                                                                    |C2                                                                    |6.0                                                                   |
|A1                                                                    |C3                                                                    |5.0                                                                   |
|A2                                                                    |C1                                                                    |123.3                                                                 |
|A2                                                                    |C1                                                                    |120.3                                                                 |
Query Completed
Query terminated

For a list of available SQL commands, you can read the SQL quick reference page.

Creating a merged stream and an aggregated table

Using the ksqlDB CLI, we create an aggregated stream called FULL_ORDERS that contains one message for each of the stored orders and the aggregated table AGGREGATED_ORDERS built on the FULL_ORDERS stream to calculate the total cost of all the orders for each customer:

ksql> CREATE STREAM FULL_ORDERS AS
  SELECT customers.customerId, customers.name, items.itemId, items.item, orders.price
  FROM orders
  JOIN customers  ON orders.customerId = customers.customerId
  JOIN items  ON items.itemId = orders.itemId 
  EMIT CHANGES;

ksql> CREATE TABLE AGGREGATED_ORDERS AS
  SELECT CUSTOMERS_CUSTOMERID, count(*) as noOfItems, sum(price) as totalPrice
  FROM FULL_ORDERS
  GROUP BY CUSTOMERS_CUSTOMERID
  EMIT CHANGES;

The ksqlDB documentation describes the difference between streams and tables: the first are immutable collections that represent a series of historical facts, while the latter are mutable collections that change over time, defining the current status of any entity identified by its key.

The execution of the SHOW TOPICS and SHOW QUERIES commands describe the effects of the above commands on the ksqlDB database.

ksql> SHOW TOPICS;

 Kafka Topic                      | Partitions | Partition Replicas 
--------------------------------------------------------------------
 customers                        | 1          | 1                  
 items                            | 1          | 1                  
 ksql-service-ksql_processing_log | 1          | 1                  
 ksql-topic-AGGREGATED_ORDERS     | 1          | 1                  
 ksql-topic-FULL_ORDERS           | 1          | 1                  
 orders                           | 1          | 1                  
--------------------------------------------------------------------

ksql> SHOW QUERIES;

 Query ID                 | ...                                                                                                                                                                                                                                                                                                                                                                                                                  
-------------------------------
 CSAS_FULL_ORDERS_7       | ... 
 CTAS_AGGREGATED_ORDERS_9 | ...
-------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

Note that all the newly created topics are prefixed by ksql-topic-, which is the configuration that we provided in the startKsql.sh script.

The Quarkus receiver application

In the receiver folder of the demo source code, we developed a Quarkus application that receives messages from the newly created stream AGGREGATED_ORDERS and prints the content to the console. Using the @Incoming annotation, we can specify the Kafka topic as the data source:

@ApplicationScoped
public class AggregatedOrderReceiver {
    private Logger log = Logger.getLogger(AggregatedOrderReceiver.class);

    @Incoming("get-aggregated-order")
    @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
    public void process(ConsumerRecord<String, AggregatedOrder> record) {
        record.value().customerId = record.key();
        log.infof("AggregatedOrderReceiver received %s", record.value());
    }
}

The topic configuration is specified in the application.properties file:

# Configure the Kafka source (we read from it)
mp.messaging.incoming.get-aggregated-order.topic=ksql-topic-AGGREGATED_ORDERS
mp.messaging.incoming.get-aggregated-order.group.id=${KAFKA_CONSUMER_GROUP}
mp.messaging.incoming.get-aggregated-order.value.deserializer=org.acme.kafka.AggregatedOrderDeserializer

Launch the Quarkus receiver application to connect to the Kafka cluster and receive updates on the ksql-topic-AGGREGATED-ORDERS topic:

cd $DEMO_HOME/receiver
./startReceiver.sh

Testing the integration

To validate the integration, we will run a push query to receive updates from the AGGREGATED-ORDERS table:

ksql> SELECT * FROM AGGREGATED_ORDERS EMIT CHANGES;
+-----------------------+-----------------+----------------+
|CUSTOMERS_CUSTOMERID   |NOOFITEMS        |TOTALPRICE      |
+-----------------------+-----------------+----------------+

Press CTRL-C to interrupt

The query includes the option EMIT CHANGES to make it a push query that produces a continuously updating stream of the latest messages, a never-ending query that must be explicitly terminated using the TERMINATE command in a new ksqlDB CLI session:

ksql> SHOW QUERIES;
ksql> TERMINATE <QUERY ID>;

You can read more about the difference between push and pull queries on the ksqlDB documentation page.

At the same time, we use the neworder.sh script available in the ksqldb folder to push some new random messages in the orders topic.

cd $DEMO_HOME/ksqldb
./neworder.sh -o 10

The animation in Figure 3 shows how the messages created in the neworders tab of the screen are displayed in the ksqlDB CLI running the SELECT statement (left screen of receivers tab) and on the Quarkus receiver application on the right screen of the receivers tab:

The SELECT query and the Quarkus receiver application receive new messages from the aggregated topic as soon as new orders are injected with the neworder.sh script
Figure 3: The SELECT query and the Quarkus receiver application receive new messages from the aggregated topic as soon as new orders are injected with the neworder.sh script.

Tear down the managed resources

To clear the setup, we can delete all the resources created for the purpose of the demo:

cd $DEMO_HOME/ksqldb
./deleteAll.sh
cd ..
./deleteAll.sh

ksqlDB to process streams of Kafka messages

By using ksqlDB and the familiar SQL command syntax, we processed the messages in an Apache Kafka cluster and generated new aggregated messages using standard SQL aggregation functions. We injected new messages on the topics and watched the live streams of events coming in real time from a push query in the ksqlDB CLI and from a message stream in a Quarkus application.

No code was needed to process the streams, apart from a few SQL commands. The procedures presented in the demo are available in GitHub.

The rhoas CLI was used to interact with the Red Hat OpenShift Application and Data Services and hide the complexity of creating the Kafka instance and defining the access rules for both the Quarkus application and the ksqlDB server.

Last updated: October 5, 2023