Skip to main content
Redhat Developers  Logo
  • Products

    Featured

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat OpenShift AI
      Red Hat OpenShift AI
    • Red Hat Enterprise Linux AI
      Linux icon inside of a brain
    • Image mode for Red Hat Enterprise Linux
      RHEL image mode
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • Red Hat Developer Hub
      Developer Hub
    • View All Red Hat Products
    • Linux

      • Red Hat Enterprise Linux
      • Image mode for Red Hat Enterprise Linux
      • Red Hat Universal Base Images (UBI)
    • Java runtimes & frameworks

      • JBoss Enterprise Application Platform
      • Red Hat build of OpenJDK
    • Kubernetes

      • Red Hat OpenShift
      • Microsoft Azure Red Hat OpenShift
      • Red Hat OpenShift Virtualization
      • Red Hat OpenShift Lightspeed
    • Integration & App Connectivity

      • Red Hat Build of Apache Camel
      • Red Hat Service Interconnect
      • Red Hat Connectivity Link
    • AI/ML

      • Red Hat OpenShift AI
      • Red Hat Enterprise Linux AI
    • Automation

      • Red Hat Ansible Automation Platform
      • Red Hat Ansible Lightspeed
    • Developer tools

      • Red Hat Trusted Software Supply Chain
      • Podman Desktop
      • Red Hat OpenShift Dev Spaces
    • Developer Sandbox

      Developer Sandbox
      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Secure Development & Architectures

      • Security
      • Secure coding
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
      • View All Technologies
    • Start exploring in the Developer Sandbox for free

      sandbox graphic
      Try Red Hat's products and technologies without setup or configuration.
    • Try at no cost
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • Java
      Java icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • API Catalog
    • Product Documentation
    • Legacy Documentation
    • Red Hat Learning

      Learning image
      Boost your technical skills to expert-level with the help of interactive lessons offered by various Red Hat Learning programs.
    • Explore Red Hat Learning
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

Event-based microservices with Red Hat AMQ Streams

 

November 21, 2019
Cheng Kuan Gan
Related topics:
Kubernetes
Related products:
Streams for Apache KafkaRed Hat OpenShiftRed Hat OpenShift Container Platform

Share:

    As part of Red Hat's AMQ offerings, Red Hat offers a Kafka-based event streaming solution both for traditional deployment and microservices-based deployment branded as Red Hat AMQ Streams. The Red Hat OpenShift AMQ Streams deployment option is based on Strimzi, an open source tool that makes Kafka deployment as a container on a Kubernetes platform easy because most of the deployment prerequisites are automated with the OpenShift Operator Framework.

    In this article, we look at how to deploy Apache Kafka on Red Hat OpenShift 4, using reasonable sample microservice applications to showcase the endless possibility of innovation brought by OpenShift and Kafka.

    AMQ Streams on OpenShift

    Figure 1: Our deployment environment.

    Figure 1 illustrates the environment we will deploy on OpenShift with a number of microservices, including:

    • The Account Balance Service provides information on the account balance, which has its own MongoDB database services.
    • The Credit Service performs credit transfer between accounts, storing the credit data in the credit Kafka topic.
    • The Event Correlator listens to the credit topic, performs the necessary adjustment to the account balance, and updates the changes to the Account Balance Service via the REST API. At the same time, it sends the outcome of this process as a credit response to the Kafka topic credit-response.
    • MongoDB Kafka Connect listens to the content in the credit-response topic and streams this information to the Credit Response DB.

    Install AMQ Streams on Red Hat OpenShift

    AMQ Streams installation is pretty straightforward on Red Hat OpenShift. The only issue I faced was configuring MongoDB Kafka Connect, and that was mostly due to a lack of detailed documentation and a bug in Kafka Connect. All of these issues are now structurally documented as the following:

    1. Download the YAML installation files from the Red Hat Access website.

    Note: We are installing these AMQ Streams using cluster admin. AMQ Streams includes several custom resources. By default, permission to create, edit, and delete these resources is limited to OpenShift cluster administrators. If you want to allow non-cluster administrators to manage AMQ Streams resources, you must assign them the Strimzi Administrator role.

    1. Deploy the Kafka cluster using the Kafka Operator, which can watch Kafka resources for single and multiple namespaces. In our case, we deploy the Operator to watch for a single namespace. Once you download and unzip the installation files, navigate to the root folder, which contains two folders: examples and install.
    2. Run the following from the command line to make changes to the provided YAML files for our single OpenShift namespace deployment:

    On Linux:

    $ sed -i 's/namespace: .*/namespace: my-kafka-example/' install/cluster-operator/*RoleBinding*.yaml

    On macOS:

    $ sed -i '' 's/namespace: .*/namespace: my-kafka-example/' install/cluster-operator/*RoleBinding*.yaml
    1. Run the following command to deploy the Operator once the namespace is changed:
    $ oc apply -f install/cluster-operator -n my-kafka-example

    Note: You can also configure the Kafka Operator to watch for all namespaces. Please refer to the documentation for details.

    1. Deploy the Kafka cluster once the Kafka Operator is deployed. There are two options: ephemeral and persistent. We will deploy a persistent Kafka cluster. Begin by opening the examples/kafka/kafka-persistent.yaml file and changing the Kafka cluster name in the Kafka.metadata.name property as follows:
    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-kafka-cluster
    # ...
    1. Configure the Topic Operator as the following in the same kafka-persistent.yaml file as before, in order to enable auto-creation of the Kafka topics configured in the applications:
    entityOperator:
        topicOperator:
          watchedNamespace: my-kafka-example
          reconciliationIntervalSeconds: 90
          zookeeperSessionTimeoutSeconds: 20
          topicMetadataMaxAttempts: 6
          image: registry.redhat.io/amq7/amq-streams-operator:1.3.0
    1. Run the following command to deploy the Kafka cluster:
    oc apply -f examples/kafka/kafka-persistent.yaml
    1. Deploy and run the following sample Kafka producer:
    oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list kafka-cluster-kafka-bootstrap:9092 --topic my-topic
    1. Deploy and run the following sample Kafka consumer:
    oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
    1. Verify that the Kafka cluster is working as expected once all of the pods and resources are ready, which means seeing if you can send messages from the producer to the consumer.

    Deploy sample application dependencies

    Due to AMQ Streams, brokers are not accessible directly outside of the namespace where they are deployed. We will deploy all of our sample applications in the same namespace as the Kafka cluster. This issue is the same for all Kafka brokers deployed as containers, and this is not a limitation of OpenShift. All external connections are handled and routed by the Kafka bootstrap component.

    Note: If you wish to access the brokers externally, please refer to this article on how to enable this option.

    Deploy MongoDB

    This is our Credit Response database. Run the following command to deploy MongoDB using the provided template:
    oc new-app -f https://raw.githubusercontent.com/chengkuangan/creditresponsemongodb/master/mongodb-deployment-template.yaml

    During the time this article was written and due to the hardcoded database authentication source, I received the following error (source='admin') in the Kafka Connect container log when Kafka Connect was trying to send data to MongoDB:

    2019-11-07 12:23:07,617 INFO Cluster created with settings {hosts=[creditresponse:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster) [task-thread-mongodb-sink-0]
    2019-11-07 12:23:07,842 INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster) [task-thread-mongodb-sink-0]
    2019-11-07 12:23:07,861 INFO Opened connection [connectionId{localValue:1, serverValue:220}] to creditresponse:27017 (org.mongodb.driver.connection) [cluster-ClusterId{value='5dc40cab2efd9074c7742e33', description='null'}-creditresponse:27017]
    2019-11-07 12:23:07,863 INFO Monitor thread successfully connected to server with description ServerDescription{address=creditresponse:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 10]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=1014605} (org.mongodb.driver.cluster) [cluster-ClusterId{value='5dc40cab2efd9074c7742e33', description='null'}-creditresponse:27017]
    2019-11-07 12:23:07,892 INFO Closed connection [connectionId{localValue:2}] to creditresponse:27017 because there was a socket exception raised by this connection. (org.mongodb.driver.connection) [task-thread-mongodb-sink-0]
    2019-11-07 12:23:07,894 ERROR Error on mongodb operation (com.mongodb.kafka.connect.sink.MongoSinkTask) [task-thread-mongodb-sink-0]
    com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='creditresponse', source='admin', password=<hidden>, mechanismProperties={}}
      at com.mongodb.internal.connection.SaslAuthenticator.wrapException(SaslAuthenticator.java:173)

    To work around this problem, perform oc rsh into the MongoDB pod to create a new creditresponse user account with the following details:

    mongo --port 27017 -u admin -p creditresponse --authenticationDatabase admin
    use admin
    db.runCommand({createRole:"listDatabases",privileges:[{resource:{cluster:true}, actions:["listDatabases"]}],roles:[]})
    db.createUser({
    "user" : "creditresponse",
    "pwd" : "creditresponse",
        "roles" : [
            {
                "role" : "listDatabases",
                "db" : "admin"
            },
            {
                "role" : "readWrite",
                "db" : "creditresponse"
            },
            {
                "role" : "read",
                "db" : "local"
            }
        ]
    })

    Deploy and configure MongoDB Kafka Connect

    AMQ Streams Kafka Connect only comes with FileStreamSourceConnector and FileStreamSinkConnector. In order to deploy MongoDB Kafka Connect, we need to build the container image with the MongoDB Kafka Connect JAR file and the Red Hat-supported AMQ Streams base image:

    1. Proceed to create the necessary Dockerfile with the following content:
    [source]
    FROM registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
    USER root:root
    COPY ./mongo-plugins/ /opt/kafka/plugins/
    USER kafka:kafka
    1. Download the MongoDB Kafka Connect JAR files from the MongoDB website.
    2. Unzip and copy the JAR file to the mongo-plugins folder.
    3. Make sure you have a valid Red Hat account in order to log in and access registry.redhat.io.
    4. Build the image:
    docker login registry.redhat.io
    
    docker build -t chengkuan/amq-streams-kafka-connect-23:1.3.0
    1. Change kafka-connect.yaml with the following spec.image, spec.bootstrapServers, and spec.tls.trustedCertificates.secretName. Take note that the port number for spec.bootstrapServers is 9093, which is the default clienttls port:
    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnect
    metadata:
      name: mongodb-connect-cluster
    spec:
      version: 2.3.0
      replicas: 1
      bootstrapServers: kafka-cluster-kafka-bootstrap:9093
      tls:
        trustedCertificates:
          - secretName: kafka-cluster-cluster-ca-cert
            certificate: ca.crt
      image: docker.io/chengkuan/amq-streams-kafka-connect-23:1.3.0
    1. Deploy Kafka Connect:
    oc apply -f examples/kafka-connect/kafka-connect.yaml
    1. Port forward from a local PC to OpenShift's connect API service because Kafka Connect's pod is not accessible externally:
    oc port-forward service/mongodb-connect-cluster-connect-api 8083:8083
    1. Run the following using curl or a web browser to verify that the MongoDB Connect plugin loaded successfully:
    curl http://localhost:8083/connector-plugins

    You will see that the MongoDB Connect plugin is listed:

    [{"class":"com.mongodb.kafka.connect.MongoSinkConnector","type":"sink","version":"0.2"},{"class":"com.mongodb.kafka.connect.MongoSourceConnector","type":"source","version":"0.2"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.3.0.redhat-00003"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.3.0.redhat-00003"}]

    To configure MongoDB Kafka Connect, download connect-mongodb-sink.json and modify the following accordingly:

    {
        "name": "mongodb-sink",
        "config": {
            "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
            "tasks.max": 1,
            "topics": "credit-response",
            "connection.uri": "mongodb://creditresponse:creditresponse@creditresponse:27017",
            "database": "creditresponse",
            "collection": "response",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": false,
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": false,
            "max.num.retries": 3
        }
    }

    Next, post to the Kafka Connect REST API:

    curl -d connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

    Note: At the time this was written, I could not POST the content successfully to Kafka Connect using the curl command due to a JSON formatting error. However, I can POST the same content without errors using Postman.

    Finally, run the following curl command to verify the configuration:

    curl http://localhost:8083/connectors/mongodb-sink

    The result:

    {
        "name": "mongodb-sink",
        "config": {
            "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
            "key.converter.schemas.enable": "false",
            "database": "creditresponse",
            "tasks.max": "1",
            "topics": "credit-response",
            "max.num.retries": "3",
            "connection.uri": "mongodb://creditresponse:creditresponse@creditresponse:27017",
            "value.converter.schemas.enable": "false",
            "name": "mongodb-sink",
            "collection": "response",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter"
        },
        "tasks": [
            {
                "connector": "mongodb-sink",
                "task": 0
            }
        ],
        "type": "sink"
    }

    Deploy the Account Balance Service application

    Run the following command to deploy the pods:

    oc new-app https://raw.githubusercontent.com/chengkuangan/accountbalance/master/templates/deployment-templates.yaml

    Deploy the Credit Service application

    Run the following command to deploy the Credit Service. Ensure that KAFKA_BOOTSTRAP_SERVER points to the correct server:

    oc new-app https://raw.githubusercontent.com/chengkuangan/creditservice/master/templates/creditservice.json -p KAFKA_BOOTSTRAP_SERVER=kafka-cluster-kafka-bootstrap:9092

    Deploy the Event Correlator Service

    Run the following command to deploy the Event Correlator Service. Ensure that KAFKA_BOOTSTRAP_SERVER points to the correct server:
    oc new-app -f https://raw.githubusercontent.com/chengkuangan/eventcorrelator/master/templates/deployment-templates.yaml -p KAFKA_BOOTSTRAP_SERVER=kafka-cluster-kafka-bootstrap:9092

    Watch the application in action

    Now, let us use our application. Create account balance records:
    curl -H "Content-Type: application/json" -X POST http://accountbalance-my-kafka-example.apps.demo.ocp.internal/ws/pg/balance -d '{"accountId": "20191108-MY-00000001", "balance": 500.00, "lastUpdatedDate": 1563178274158 }'
    curl -H "Content-Type: application/json" -X POST http://accountbalance-my-kafka-example.apps.demo.ocp.internal/ws/pg/balance -d '{"accountId": "20191108-MY-00000002", "balance": 700.00, "lastUpdatedDate": 1563178274158 }'

    Query the account balance entries created above:

    curl http://accountbalance-my-kafka-example.apps.demo.ocp.internal/ws/pg/balance/all

    The result:

    [
       {
          "_id":"5dc52069a3c1080001ebd539",
          "accountId":"20191108-MY-00000001",
          "balance":500.0,
          "lastUpdatedDate":1563178274158
       },
       {
          "_id":"5dc52076a3c1080001ebd53a",
          "accountId":"20191108-MY-00000002",
          "balance":700.0,
          "lastUpdatedDate":1563178274158
       }
    ]

    Perform a credit transfer:

    curl -H "Content-Type: application/json" -X POST http://creditservice-my-kafka-example.apps.demo.ocp.internal/ws/pg/credits -d '{"amount": 10.50, "sourceAccount": "20191108-MY-00000001", "targetAccount": "20191108-MY-00000002"}'

    Query the balance after the credit transfer:

    curl http://accountbalance-my-kafka-example.apps.demo.ocp.internal/ws/pg/balance/all

    The result:

    [
       {
          "_id":"5dc52069a3c1080001ebd539",
          "accountId":"20191108-MY-00000001",
          "balance":489.5,"lastUpdatedDate":1573200543543
       },
       {
          "_id":"5dc52076a3c1080001ebd53a",
          "accountId":"20191108-MY-00000002",
          "balance":710.5,
          "lastUpdatedDate":1573200543543
       }
    ]

    Perform oc rsh into the Credit Response MongoDB. Use db.response.find() to see that the credit response is captured:

    mongo --port 27017 -u admin -p creditresponse --authenticationDatabase admin
    
    >use creditresponse
    switched to db creditresponse
    
    >show collections
    response
    
    >db.response.find()

    Result:

    { 
       "_id" : ObjectId("5dc523f536d41402601d01a4"), 
       "sourceAccountRecordId" : "5dc52069a3c1080001ebd539", 
       "targetAccountRecordId" : "5dc52076a3c1080001ebd53a", 
       "sourceAccountId" : "20191108-MY-00000001", 
       "targetAccountId" : "20191108-MY-00000002", 
       "sourceAccountBalance" : 489.5, 
       "targetAccountBalance" : 710.5, 
       "creditRecordId" : "ykvlkqk2puzc5u", 
       "creditAmount" : 10.5, 
       "transactionDate" : NumberLong("1573200543543") 
    }

    References

    • Red Hat AMQ Streams on OpenShift documentation
    • Apache Kafka Connect documentation
    Last updated: January 12, 2024

    Recent Posts

    • Our top 10 articles of 2025 (so far)

    • The benefits of auto-merging GitHub and GitLab repositories

    • Supercharging AI isolation: microVMs with RamaLama & libkrun

    • Simplify multi-VPC connectivity with amazon.aws 9.0.0

    • How HaProxy router settings affect middleware applications

    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit
    © 2025 Red Hat

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue