Skip to main content
Redhat Developers  Logo
  • Products

    Platforms

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat AI
      Red Hat AI
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • View All Red Hat Products

    Featured

    • Red Hat build of OpenJDK
    • Red Hat Developer Hub
    • Red Hat JBoss Enterprise Application Platform
    • Red Hat OpenShift Dev Spaces
    • Red Hat OpenShift Local
    • Red Hat Developer Sandbox

      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Secure Development & Architectures

      • Security
      • Secure coding
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • Product Documentation
    • API Catalog
    • Legacy Documentation
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

Building Apache Kafka Streams applications using Red Hat AMQ Streams: Part 2

 

June 18, 2019
Adam Cattermole
Related topics:
Event-DrivenKubernetes
Related products:
Streams for Apache Kafka

Share:

    The Apache Kafka project includes a Streams Domain-Specific Language (DSL) built on top of the lower-level Stream Processor API. This DSL provides developers with simple abstractions for performing data processing operations. However, how one builds a stream processing pipeline in a containerized environment with Kafka isn’t clear. This second article in a two-part series uses the basics from the previous article to build an example application using Red Hat AMQ Streams.

    Now let's create a multi-stage pipeline operating on real-world data and consume and visualize the data.

    The system architecture

    In this article, we build a solution that follows the architecture diagram below. It may be worth referring back here for each new component:

    System Architecture

    The dataset and problem

    The data we chose for this example is the New York City taxi journey information from 2013, which was used for the ACM Distributed Event-Based Systems (DEBS) Grand Challenge in 2015. You can find a description of the data source here.

    This example's dataset is provided as a CSV file, with the columns detailed below:

    Column Description
    medallion an md5sum of the identifier of the taxi - vehicle bound
    hack_license an md5sum of the identifier for the taxi license
    pickup_datetime time when the passenger(s) were picked up
    dropoff_datetime time when the passenger(s) were dropped off
    trip_time_in_secs duration of the trip
    trip_distance trip distance in miles
    pickup_longitude longitude coordinate of the pickup location
    pickup_latitude latitude coordinate of the pickup location
    dropoff_longitude longitude coordinate of the drop-off location
    dropoff_latitude latitude coordinate of the drop-off location
    payment_type the payment method - credit card or cash
    fare_amount fare amount in dollars
    surcharge surcharge in dollars
    mta_tax tax in dollars
    tip_amount tip in dollars
    tolls_amount bridge and tunnel tolls in dollars
    total_amount total paid amount in dollars

    Source: DEBS 2015 Grand Challenge.

    We can explore interesting avenues within this dataset, such as following specific taxis to calculate:

    • The money earned from one taxi throughout the course of a day.
    • The distance from the last drop-off to the next pick-up to find out whether they travel far without a passenger.
    • The average speed of the taxi's trip by using the distance and time; we then use the pick-up and drop-off coordinates to guess the amount of traffic encountered.

    The example

    The processing we chose for this example is relatively straightforward. We calculate the total amount of money (fare_amount + tip_amount) earned within a particular area of the city, based off of journeys starting there. This calculation involves splitting the input data into a grid of different cells and then summing the total amount of money taken for every journey that originates from any cell. To accomplish this, we have to consider splitting up processing in a way that ensures that our output is correct.

    We will build this example step-by-step using what we learned in Part 1.

    Send data into Apache Kafka

    First, we need to make our dataset accessible from the cluster. This task would normally involve connecting to a service that lets us poll the live data in real-time, but the data chose is historical, so we have to emulate the real-time behavior.

    Kafka Connect is ideal for this sort of function and is exactly what we will use in the end. However, for now, we avoid the additional complexity involved and use a Kafka Producer application as discussed in Part 1. We accomplish this by including the (smaller) data file in the JAR and reading it line-by-line to send to Kafka. See the TaxiProducerExample for an example of how this works. To start, we set the TOPIC to write the data to taxi-source-topic in the deployment configuration.

    Now let's check that the data is streaming to the topic:

    $ oc run kafka-consumer -ti --image=registry.access.redhat.com/amq7/amq-streams-kafka:1.1.0-kafka-2.1.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic taxi-source-topic --from-beginning
    
      07290D3599E7A0D62097A346EFCC1FB5,E7750A37CAB07D0DFF0AF7E3573AC141,2013-01-01 00:00:00,2013-01-01 00:02:00,120,0.44,-73.956528,40.716976,-73.962440,40.715008,CSH,3.50,0.50,0.50,0.00,0.00,4.50
      22D70BF00EEB0ADC83BA8177BB861991,3FF2709163DE7036FCAA4E5A3324E4BF,2013-01-01 00:02:00,2013-01-01 00:02:00,0,0.00,0.000000,0.000000,0.000000,0.000000,CSH,27.00,0.00,0.50,0.00,0.00,27.50
      0EC22AAF491A8BD91F279350C2B010FD,778C92B26AE78A9EBDF96B49C67E4007,2013-01-01 00:01:00,2013-01-01 00:03:00,120,0.71,-73.973145,40.752827,-73.965897,40.760445,CSH,4.00,0.50,0.50,0.00,0.00,5.00
      ...
    

    Create the Kafka Streams operations

    Now that we have a topic with the String data, we can start creating our application logic. First, let's set up the Kafka Streams application's configuration options. We do this in a separate config class—see TripConvertConfig—that uses the same method of reading from environment variables described in Part 1. Note that we use this same method of providing configuration for each new application we build.

    We can now generate the configuration options:

    TripConvertConfig config = TripConvertConfig.fromMap(System.getenv());
    Properties props = TripConvertConfig.createProperties(config);
    

    As described in the basic example, the actions we perform will be to read from one topic, perform some kind of operation on the data, and write out to another topic.

    Let's create the source stream in the same method we saw before:

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source = builder.stream(config.getSourceTopic());
    

    The data we receive is currently in a format that is not easy to use. Our long CSV data is represented as a String, and we do not have access to the individual fields.

    To perform operations on the data, we need to convert the () events into a type we know. For this purpose, we created a Plain Old Java Object (POJO), representing the Trip data type, and an enum TripFields, representing each data element's columns. The function constructTripFromString takes each of the lines of CSV data and converts it into Trips. This function is implemented in the TripConvertApp class.

    The Kafka Streams DSL makes it easy to perform this function for every new record we receive:

    KStream<String, Trip> mapped = source
                    .map((key, value) -> {
                        new KeyValue<>(key, constructTripFromString(value))
                    });
    

    We could now write the mapped stream out to the sink topic. However, the serialization/deserialization (SerDes) process for our value field has changed from the Serdes.String() that we set it to from the TripConvertConfig class. Because our Trip type is custom-built for our application, we must create our own SerDes implementation.

    This is where the JsonObjectSerde class comes into play. We use this class to handle converting our custom objects to and from JSON, letting the Vertx JsonObject class do the heavy lifting. A few annotations are required on the object's constructor, which can be easily seen in the Location class.

    We are now ready to output to our sinkTopic, using the following command:

    final JsonObjectSerde tripSerde = new JsonObjectSerde<>(Trip.class);
    mapped.to(config.getSinkTopic(), Produced.with(Serdes.String(), tripSerde));
    

    Generate application-specific information

    The intention of our application is to calculate the total amount of money received by all journeys originating from any particular cell. We, therefore, must perform calculations using the journey origin's latitude and longitude, to determine which cell it belongs to. We use the logic laid out in the DEBS Grand Challenge for defining grid specifics. See the figure below for an example:

    Taxi Journey Grid

    We must set the grid's origin (blue point), which represents the center of grid cell (1,1), and a size in meters for every cell in the grid. Next, we convert the cell size into a latitude and longitude distance, dy and dx respectively, and calculate the grid's top left position (red point). For any new arrival point, we can easily count how many dy and dx away the coordinates are, and therefore in the example above (yellow point), we can determine that the journey originates from cell (3,4).

    The additional application logic in the Cell class and TripConvertApp performs this calculation, and we set the new record's key as the Cell type. To write to the sinkTopic, we need a new SerDes, created in an identical fashion to the one we made before.

    As we chose the default partitioning strategy, records are partitioned based on the keys' different values, so this change ensures that every Trip corresponding to a particular pick-up Cell is distributed to the same partition. When we perform processing downstream, the same processing node receives all records corresponding to the same pickup cell, ensuring the operations' correctness and reproducibility.

    Aggregate the data

    We have now converted all of the incoming data to a type of <Cell, Trip>, and can perform an aggregation operation. Our intention is to calculate the sum of the fare_amount + tip_amount for every journey originating from one pick-up cell, across a set time period.

    Because our data is historical, the time window that we use should be in relation to the original time that the events occurred, rather than the time that the event entered the Kafka system. To do this, we need to provide a method of extracting this information from each record: a class that implements TimestampExtractor. The Trip fields already contain this information for both pick-up and drop-off times, and so the implementation is straightforward—see the implementation in TripTimestampExtractor for details.

    Even though the topic we read from is already partitioned by cell, there are many more cells than partitions, so each of our replicas will process the data for more than one cell. To ensure that the windowing and aggregation are performed on a cell-by-cell basis, we call the groupByKey() function first, followed by a subsequent windowing operation. As seen below, the window size is easily changeable, but for now, we opted for a window of 15 minutes.

    The data can now be aggregated to generate the output metric we want. Doing this is as simple as providing an accumulator value and the operation to perform for each record. The output is of type KTable, where each key represents one particular window, and the value is the output of our aggregation operation. We use the toStream() function to convert that output back to a Kafka stream so that it can be output to the sink profit.

    KStream<Cell, Trip> source = builder.stream(config.getSourceTopic(), Consumed.with(cellSerde, tripSerde));
    KStream<Windowed, Double> windowed = source
            .groupByKey(Serialized.with(cellSerde, tripSerde))
            .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
            .aggregate(
                    () -> (double) 0,
                    (key, value, profit) -> {
                        profit + value.getFareAmount() + value.getTipAmount()
                    },
                    Materialized.<Cell, Double, WindowStore<Bytes, byte[]>>as("profit-store")
                            .withValueSerde(Serdes.Double()))
            .toStream();
    

    As we do not require the information of which window the values belong to, we reset the cell as the record's keys and round the value to two decimal places.

    KStream<Cell, Double> rounded = windowed
                    .map((window, profit) -> new KeyValue<>(window.key(), (double) Math.round(profit*100)/100));
    

    Finally, the data can now be written to the output topic using the same method as defined before:

    rounded.to(config.getSinkTopic(), Produced.with(cellSerde, Serdes.Double()));
    

    Consume and visualize the data

    We now have the windowed cell-based metric being output to the last topic, so the final step is to consume and visualize the data. For this step, we use the Vertx Kafka Client to read the data from our topic and stream it to a JavaScript dashboard using the Vertx EventBus and SockJS (WebSockets). See TripConsumerApp for the implementation.

    This consumer application registers a handler that converts arriving records into a readable JSON format and publishes the output over an outbound EventBus channel. The JavaScript connects to this channel and registers a handler for all incoming messages that perform relevant actions to visualize the data:

    KafkaConsumer<String, Double> consumer = KafkaConsumer.create(vertx, props, String.class, Double.class);
    consumer.handler(record -> {
        JsonObject json = new JsonObject();
        json.put("key", record.key());
        json.put("value", record.value());
        vertx.eventBus().publish("dashboard", json);
    });
    

    We log the raw metric information in a window so it can be seen, and use a geographical mapping library (Leaflet) to draw the original cells, modifying the opacity based on the metric's value:

    Trip Consumer App Dashboard

    By modifying the starting latitude and longitude—or the cell size—in both index.html and TripConvertApp, you can change the grid you are working with. You can also adjust the aggregate function's logic to calculate alternative metrics from the data:

    Taxi Dashboard Focus

    Create a Kafka connector

    Up until now, the producer we are using has been sufficient, even though the JAR (and image) are more bloated due to the additional data file. However, if we wanted to process the full 12GB dataset, what we have is not an ideal solution.

    The example connector we built relies on hosting the file on an FTP server, but there are existing connectors for several different file stores. We picked an FTP server as it allows our connector to easily communicate with a file external to the cluster. For convenience, we use a Python library pyftpdlib to host the file with the username and password set to amqstreams. However, hosting the file on any publicly accessible FTP server is sufficient.

    A Kafka connector consists of both itself and tasks (also known as workers) that perform the data retrieval through calls to the poll() function. The connector passes configuration over to the workers, and several workers can be invoked as per the tasks.max parameter. For this purpose, we created an FTPConnection class, which provides the functions we require from the Apache Commons FTPClient. On each call to poll(), we retrieve the next line from the file and publish this record to the topic provided in the configuration.

    We now need to add our connector plugin to the existing amq-streams-kafka-connect Docker image, which is done by adding the JAR to the plugins folder, as described in the Red Hat AMQ Streams documentation. We can then deploy the Kafka Connect cluster using the instructions from the default KafkaConnect example, but adding the spec.image field to our kafka-connect.yamlinstead, and pointing to the image containing our plugin.

    Kafka Connect is exposed as a RESTful resource, so to check which connector plugins are present, we can run the following GET request:

    $ oc exec -c kafka -i my-cluster-kafka-0 -- curl -s -X GET \
        http://my-connect-cluster-connect-api:8083/connector-plugins
    
      [{"class":"io.strimzi.TaxiSourceConnector","type":"source","version":"1.0-SNAPSHOT"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.1.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.1.0"}]
    

    Similarly, to create a new connector we can POST the JSON configuration, as shown in the example below. This new connector instance establishes an FTP connection to the server and streams the data to the taxi-source-topic. For this process to work, the following configuration options must be set correctly:

    • connect.ftp.address – FTP connection URL host:port.
    • connect.ftp.filepath – Path to file on remote FTP server from root.

    If  needed, add this optional configuration:

    • connect.ftp.attempts – Maximum number of attempts to retrieve a valid FTP connection (default: 3).
    • connect.ftp.backoff.ms – Back-off time in milliseconds between connection attempts (default: 10000ms).
    $ oc exec -c kafka -i my-cluster-kafka-0 -- curl -s -X POST \
        -H "Accept:application/json" \
        -H "Content-Type:application/json" \
        http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
    
    {
        "name": "taxi-connector",
        "config": {
            "connector.class": "io.strimzi.TaxiSourceConnector",
            "connect.ftp.address": "<ip-address>",
            "connect.ftp.user": "amqstreams",
            "connect.ftp.password": "amqstreams",
            "connect.ftp.filepath": "sorteddata.csv",
            "connect.ftp.topic": "taxi-source-topic",
            "tasks.max": "1",
            "value.converter": "org.apache.kafka.connect.storage.StringConverter"
        }
    }
    EOF
    
      {"name":"taxi-connector","config":{"connector.class":"io.strimzi.TaxiSourceConnector","connect.ftp.address":"<ip-address>","connect.ftp.user":"amqstreams","connect.ftp.password":"amqstreams","connect.ftp.filepath":"sorteddata.csv","connect.ftp.topic":"taxi-source-topic","tasks.max":"1","value.converter":"org.apache.kafka.connect.storage.StringConverter","name":"taxi-connector"},"tasks":[],"type":null}
    

    We can GET the currently deployed connectors:

    $ oc exec -c kafka -i my-cluster-kafka-0 -- curl -s -X GET \
        http://my-connect-cluster-connect-api:8083/connectors
    
      ["taxi-connector"]
    

    We can check whether the data is streaming to the topic in the same way as we did in section Send data into Apache Kafka. For debugging information, see the logs for my-connect-cluster-connect. To stop the plugin, we can delete it with the following command:

    $ oc exec -c kafka -i my-cluster-kafka-0 -- curl -s -X DELETE \
        http://my-connect-cluster-connect-api:8083/connectors/taxi-connector
    

    That's it. We managed to create a more complex Red Hat AMQ Streams application where we source historical real-world data with a Kafka Connector, stream it through multiple processing points, and sink it to a visualization.

    Read more

    Building Apache Kafka Streams applications using Red Hat AMQ Streams: Part 1

    Last updated: March 28, 2023

    Recent Posts

    • Why some agentic AI developers are moving code from Python to Rust

    • Confidential VMs: The core of confidential containers

    • Benchmarking with GuideLLM in air-gapped OpenShift clusters

    • Run Qwen3-Next on vLLM with Red Hat AI: A step-by-step guide

    • How to implement observability with Python and Llama Stack

    What’s up next?

     

    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit
    © 2025 Red Hat

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue