Red Hat AMQ Streams

As part of Red Hat's AMQ offerings, Red Hat offers a Kafka-based event streaming solution both for traditional deployment and microservices-based deployment branded as Red Hat AMQ Streams. The Red Hat OpenShift AMQ Streams deployment option is based on Strimzi, an open source tool that makes Kafka deployment as a container on a Kubernetes platform easy because most of the deployment prerequisites are automated with the OpenShift Operator Framework.

In this article, we look at how to deploy Apache Kafka on Red Hat OpenShift 4, using reasonable sample microservice applications to showcase the endless possibility of innovation brought by OpenShift and Kafka.

AMQ Streams on OpenShift

Figure 1: Our deployment environment.

Figure 1 illustrates the environment we will deploy on OpenShift with a number of microservices, including:

  • The Account Balance Service provides information on the account balance, which has its own MongoDB database services.
  • The Credit Service performs credit transfer between accounts, storing the credit data in the credit Kafka topic.
  • The Event Correlator listens to the credit topic, performs the necessary adjustment to the account balance, and updates the changes to the Account Balance Service via the REST API. At the same time, it sends the outcome of this process as a credit response to the Kafka topic credit-response.
  • MongoDB Kafka Connect listens to the content in the credit-response topic and streams this information to the Credit Response DB.

Install AMQ Streams on Red Hat OpenShift

AMQ Streams installation is pretty straightforward on Red Hat OpenShift. The only issue I faced was configuring MongoDB Kafka Connect, and that was mostly due to a lack of detailed documentation and a bug in Kafka Connect. All of these issues are now structurally documented as the following:

  1. Download the YAML installation files from the Red Hat Access website.

Note: We are installing these AMQ Streams using cluster admin. AMQ Streams includes several custom resources. By default, permission to create, edit, and delete these resources is limited to OpenShift cluster administrators. If you want to allow non-cluster administrators to manage AMQ Streams resources, you must assign them the Strimzi Administrator role.

  1. Deploy the Kafka cluster using the Kafka Operator, which can watch Kafka resources for single and multiple namespaces. In our case, we deploy the Operator to watch for a single namespace. Once you download and unzip the installation files, navigate to the root folder, which contains two folders: examples and install.
  2. Run the following from the command line to make changes to the provided YAML files for our single OpenShift namespace deployment:

On Linux:

$ sed -i 's/namespace: .*/namespace: my-kafka-example/' install/cluster-operator/*RoleBinding*.yaml

On macOS:

$ sed -i '' 's/namespace: .*/namespace: my-kafka-example/' install/cluster-operator/*RoleBinding*.yaml
  1. Run the following command to deploy the Operator once the namespace is changed:
$ oc apply -f install/cluster-operator -n my-kafka-example

Note: You can also configure the Kafka Operator to watch for all namespaces. Please refer to the documentation for details.

  1. Deploy the Kafka cluster once the Kafka Operator is deployed. There are two options: ephemeral and persistent. We will deploy a persistent Kafka cluster. Begin by opening the examples/kafka/kafka-persistent.yaml file and changing the Kafka cluster name in the property as follows:
kind: Kafka
  name: my-kafka-cluster
# ...
  1. Configure the Topic Operator as the following in the same kafka-persistent.yaml file as before, in order to enable auto-creation of the Kafka topics configured in the applications:
      watchedNamespace: my-kafka-example
      reconciliationIntervalSeconds: 90
      zookeeperSessionTimeoutSeconds: 20
      topicMetadataMaxAttempts: 6
  1. Run the following command to deploy the Kafka cluster:
oc apply -f examples/kafka/kafka-persistent.yaml
  1. Deploy and run the following sample Kafka producer:
oc run kafka-producer -ti --rm=true --restart=Never -- bin/ --broker-list kafka-cluster-kafka-bootstrap:9092 --topic my-topic
  1. Deploy and run the following sample Kafka consumer:
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
  1. Verify that the Kafka cluster is working as expected once all of the pods and resources are ready, which means seeing if you can send messages from the producer to the consumer.

Deploy sample application dependencies

Due to AMQ Streams, brokers are not accessible directly outside of the namespace where they are deployed. We will deploy all of our sample applications in the same namespace as the Kafka cluster. This issue is the same for all Kafka brokers deployed as containers, and this is not a limitation of OpenShift. All external connections are handled and routed by the Kafka bootstrap component.

Note: If you wish to access the brokers externally, please refer to this article on how to enable this option.

Deploy MongoDB

This is our Credit Response database. Run the following command to deploy MongoDB using the provided template:
oc new-app -f

During the time this article was written and due to the hardcoded database authentication source, I received the following error (source='admin') in the Kafka Connect container log when Kafka Connect was trying to send data to MongoDB:

2019-11-07 12:23:07,617 INFO Cluster created with settings {hosts=[creditresponse:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster) [task-thread-mongodb-sink-0]
2019-11-07 12:23:07,842 INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster) [task-thread-mongodb-sink-0]
2019-11-07 12:23:07,861 INFO Opened connection [connectionId{localValue:1, serverValue:220}] to creditresponse:27017 (org.mongodb.driver.connection) [cluster-ClusterId{value='5dc40cab2efd9074c7742e33', description='null'}-creditresponse:27017]
2019-11-07 12:23:07,863 INFO Monitor thread successfully connected to server with description ServerDescription{address=creditresponse:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 10]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=1014605} (org.mongodb.driver.cluster) [cluster-ClusterId{value='5dc40cab2efd9074c7742e33', description='null'}-creditresponse:27017]
2019-11-07 12:23:07,892 INFO Closed connection [connectionId{localValue:2}] to creditresponse:27017 because there was a socket exception raised by this connection. (org.mongodb.driver.connection) [task-thread-mongodb-sink-0]
2019-11-07 12:23:07,894 ERROR Error on mongodb operation (com.mongodb.kafka.connect.sink.MongoSinkTask) [task-thread-mongodb-sink-0]
com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='creditresponse', source='admin', password=<hidden>, mechanismProperties={}}
  at com.mongodb.internal.connection.SaslAuthenticator.wrapException(

To work around this problem, perform oc rsh into the MongoDB pod to create a new creditresponse user account with the following details:

mongo --port 27017 -u admin -p creditresponse --authenticationDatabase admin
use admin
db.runCommand({createRole:"listDatabases",privileges:[{resource:{cluster:true}, actions:["listDatabases"]}],roles:[]})
"user" : "creditresponse",
"pwd" : "creditresponse",
    "roles" : [
            "role" : "listDatabases",
            "db" : "admin"
            "role" : "readWrite",
            "db" : "creditresponse"
            "role" : "read",
            "db" : "local"

Deploy and configure MongoDB Kafka Connect

AMQ Streams Kafka Connect only comes with FileStreamSourceConnector and FileStreamSinkConnector. In order to deploy MongoDB Kafka Connect, we need to build the container image with the MongoDB Kafka Connect JAR file and the Red Hat-supported AMQ Streams base image:

  1. Proceed to create the necessary Dockerfile with the following content:
USER root:root
COPY ./mongo-plugins/ /opt/kafka/plugins/
USER kafka:kafka
  1. Download the MongoDB Kafka Connect JAR files from the MongoDB website.
  2. Unzip and copy the JAR file to the mongo-plugins folder.
  3. Make sure you have a valid Red Hat account in order to log in and access
  4. Build the image:
docker login

docker build -t chengkuan/amq-streams-kafka-connect-23:1.3.0
  1. Change kafka-connect.yaml with the following spec.image, spec.bootstrapServers, and spec.tls.trustedCertificates.secretName. Take note that the port number for spec.bootstrapServers is 9093, which is the default clienttls port:
kind: KafkaConnect
  name: mongodb-connect-cluster
  version: 2.3.0
  replicas: 1
  bootstrapServers: kafka-cluster-kafka-bootstrap:9093
      - secretName: kafka-cluster-cluster-ca-cert
        certificate: ca.crt
  1. Deploy Kafka Connect:
oc apply -f examples/kafka-connect/kafka-connect.yaml
  1. Port forward from a local PC to OpenShift's connect API service because Kafka Connect's pod is not accessible externally:
oc port-forward service/mongodb-connect-cluster-connect-api 8083:8083
  1. Run the following using curl or a web browser to verify that the MongoDB Connect plugin loaded successfully:
curl http://localhost:8083/connector-plugins

You will see that the MongoDB Connect plugin is listed:


To configure MongoDB Kafka Connect, download connect-mongodb-sink.json and modify the following accordingly:

    "name": "mongodb-sink",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "tasks.max": 1,
        "topics": "credit-response",
        "connection.uri": "mongodb://creditresponse:creditresponse@creditresponse:27017",
        "database": "creditresponse",
        "collection": "response",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "max.num.retries": 3

Next, post to the Kafka Connect REST API:

curl -d connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Note: At the time this was written, I could not POST the content successfully to Kafka Connect using the curl command due to a JSON formatting error. However, I can POST the same content without errors using Postman.

Finally, run the following curl command to verify the configuration:

curl http://localhost:8083/connectors/mongodb-sink

The result:

    "name": "mongodb-sink",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "key.converter.schemas.enable": "false",
        "database": "creditresponse",
        "tasks.max": "1",
        "topics": "credit-response",
        "max.num.retries": "3",
        "connection.uri": "mongodb://creditresponse:creditresponse@creditresponse:27017",
        "value.converter.schemas.enable": "false",
        "name": "mongodb-sink",
        "collection": "response",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter"
    "tasks": [
            "connector": "mongodb-sink",
            "task": 0
    "type": "sink"

Deploy the Account Balance Service application

Run the following command to deploy the pods:

oc new-app

Deploy the Credit Service application

Run the following command to deploy the Credit Service. Ensure that KAFKA_BOOTSTRAP_SERVER points to the correct server:

oc new-app -p KAFKA_BOOTSTRAP_SERVER=kafka-cluster-kafka-bootstrap:9092

Deploy the Event Correlator Service

Run the following command to deploy the Event Correlator Service. Ensure that KAFKA_BOOTSTRAP_SERVER points to the correct server:
oc new-app -f -p KAFKA_BOOTSTRAP_SERVER=kafka-cluster-kafka-bootstrap:9092

Watch the application in action

Now, let us use our application. Create account balance records:
curl -H "Content-Type: application/json" -X POST http://accountbalance-my-kafka-example.apps.demo.ocp.internal/ws/pg/balance -d '{"accountId": "20191108-MY-00000001", "balance": 500.00, "lastUpdatedDate": 1563178274158 }'
curl -H "Content-Type: application/json" -X POST http://accountbalance-my-kafka-example.apps.demo.ocp.internal/ws/pg/balance -d '{"accountId": "20191108-MY-00000002", "balance": 700.00, "lastUpdatedDate": 1563178274158 }'

Query the account balance entries created above:

curl http://accountbalance-my-kafka-example.apps.demo.ocp.internal/ws/pg/balance/all

The result:


Perform a credit transfer:

curl -H "Content-Type: application/json" -X POST http://creditservice-my-kafka-example.apps.demo.ocp.internal/ws/pg/credits -d '{"amount": 10.50, "sourceAccount": "20191108-MY-00000001", "targetAccount": "20191108-MY-00000002"}'

Query the balance after the credit transfer:

curl http://accountbalance-my-kafka-example.apps.demo.ocp.internal/ws/pg/balance/all

The result:


Perform oc rsh into the Credit Response MongoDB. Use db.response.find() to see that the credit response is captured:

mongo --port 27017 -u admin -p creditresponse --authenticationDatabase admin

>use creditresponse
switched to db creditresponse

>show collections



   "_id" : ObjectId("5dc523f536d41402601d01a4"), 
   "sourceAccountRecordId" : "5dc52069a3c1080001ebd539", 
   "targetAccountRecordId" : "5dc52076a3c1080001ebd53a", 
   "sourceAccountId" : "20191108-MY-00000001", 
   "targetAccountId" : "20191108-MY-00000002", 
   "sourceAccountBalance" : 489.5, 
   "targetAccountBalance" : 710.5, 
   "creditRecordId" : "ykvlkqk2puzc5u", 
   "creditAmount" : 10.5, 
   "transactionDate" : NumberLong("1573200543543") 


Last updated: January 12, 2024