Skip to main content
Redhat Developers  Logo
  • AI

    Get started with AI

    • Red Hat AI
      Accelerate the development and deployment of enterprise AI solutions.
    • AI learning hub
      Explore learning materials and tools, organized by task.
    • AI interactive demos
      Click through scenarios with Red Hat AI, including training LLMs and more.
    • AI/ML learning paths
      Expand your OpenShift AI knowledge using these learning resources.
    • AI quickstarts
      Focused AI use cases designed for fast deployment on Red Hat AI platforms.
    • No-cost AI training
      Foundational Red Hat AI training.

    Featured resources

    • OpenShift AI learning
    • Open source AI for developers
    • AI product application development
    • Open source-powered AI/ML for hybrid cloud
    • AI and Node.js cheat sheet

    Red Hat AI Factory with NVIDIA

    • Red Hat AI Factory with NVIDIA is a co-engineered, enterprise-grade AI solution for building, deploying, and managing AI at scale across hybrid cloud environments.
    • Explore the solution
  • Learn

    Self-guided

    • Documentation
      Find answers, get step-by-step guidance, and learn how to use Red Hat products.
    • Learning paths
      Explore curated walkthroughs for common development tasks.
    • Guided learning
      Receive custom learning paths powered by our AI assistant.
    • See all learning

    Hands-on

    • Developer Sandbox
      Spin up Red Hat's products and technologies without setup or configuration.
    • Interactive labs
      Learn by doing in these hands-on, browser-based experiences.
    • Interactive demos
      Click through product features in these guided tours.

    Browse by topic

    • AI/ML
    • Automation
    • Java
    • Kubernetes
    • Linux
    • See all topics

    Training & certifications

    • Courses and exams
    • Certifications
    • Skills assessments
    • Red Hat Academy
    • Learning subscription
    • Explore training
  • Build

    Get started

    • Red Hat build of Podman Desktop
      A downloadable, local development hub to experiment with our products and builds.
    • Developer Sandbox
      Spin up Red Hat's products and technologies without setup or configuration.

    Download products

    • Access product downloads to start building and testing right away.
    • Red Hat Enterprise Linux
    • Red Hat AI
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform
    • See all products

    Featured

    • Red Hat build of OpenJDK
    • Red Hat JBoss Enterprise Application Platform
    • Red Hat OpenShift Dev Spaces
    • Red Hat Developer Toolset

    References

    • E-books
    • Documentation
    • Cheat sheets
    • Architecture center
  • Community

    Get involved

    • Events
    • Live AI events
    • Red Hat Summit
    • Red Hat Accelerators
    • Community discussions

    Follow along

    • Articles & blogs
    • Developer newsletter
    • Videos
    • Github

    Get help

    • Customer service
    • Customer support
    • Regional contacts
    • Find a partner

    Join the Red Hat Developer program

    • Download Red Hat products and project builds, access support documentation, learning content, and more.
    • Explore the benefits

Understanding Red Hat AMQ Streams components for OpenShift and Kubernetes: Part 2

<p>&nbsp;</p> <quillbot-extension-portal></quillbot-extension-portal>

December 5, 2019
Pramod Padmanabhan Faisal Masood
Related topics:
Event-drivenKubernetes
Related products:
Streams for Apache KafkaRed Hat OpenShift Container Platform

    In the previous article in this series, we discussed the basics of Red Hat AMQ Streams on Red Hat OpenShift. Here are a few key points to keep in mind before we proceed:

    • AMQ Streams is based on Apache Kafka.
    • AMQ Streams for the OpenShift Container Platform is based on the Strimzi project.
    • AMQ Streams on containers has multiple components, such as the Cluster Operator, Entity Operator, Mirror Maker, Kafka connect, and Kafka Bridge.

    Now, let's continue on to setting up Kafka Connect, the Kafka Bridge, and Mirror Maker.

    Kafka Connect

    Kafka Connect is mainly used to stream data in and out of Kafka clusters; for instance, getting a Twitter feed and then pushing it to the cluster. We need to understand Kafka Connect's concepts before continuing:

    • Connectors define where the data should be copied to or from.
    • Tasks are the actors that actually copy the data.
    • Workers are used to schedule units of work for connectors and tasks.
    • Converters are used by task units to change data format.
    • Transforms are used by connector units to do simple data adjustments, routing, and chain transformations.

    Note: For more details on the basic concepts, I recommend reading Kafka Connect Concepts.

    Creating a simple Kafka Connect instance

    A Kafka Connect instance in OpenShift can be created using two different Kube objects: KafkaConnect and KafkaConnectS2I. By default, Kafka Connect includes two built-in connectors: FileStreamSourceConnector and FileStreamSinkConnector. However, before you build a new connector, first check the catalog of existing connectors.

    Let us set up a simple Kafka Connect instance and then perform the source and sink operations. Then, we can add a default producer sample app and a consumer sample app. This process will show multiple publishers and multiple consumers, as shown in Figure 1:

    The structure for our example.
    Figure 1: The overall structure for this example.

    Begin by creating the Kafka Connect config amq-kafka-connect.yml. The example file present in examples/kafka-connect/kafka-connect.yml was used as a reference for this config file:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnect
    metadata:
      name: simple-connect-cluster
    spec:
      version: 2.3.0
      replicas: 1
      bootstrapServers: simple-cluster-kafka-bootstrap:9093
      tls:
        trustedCertificates:
          - secretName: simple-cluster-cluster-ca-cert
            certificate: ca.crt

    Next, execute the YAML:

    $ oc create -f amq-kafka-connect.yml

    You can see the result in Figure 2:

    Kafka Connect is is deployed.
    Figure 2: The new deployment is in place.

    Preparing to test your new Kafka Connect instance

    Let us set up to test the new instance by doing the following:

    1. Create a new config, which will sink from the redhat-demo-topics topic content to the file amq-demo-sink.txt:
    $ oc rsh simple-cluster-kafka-0
    sh-4.2$ curl -X POST -H "Content-Type: application/json" --data '{"name": "redhat-file-sink-demo", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"1", "file":"/tmp/amq-demo-sink.txt", "topic":"redhat-demo-topics", "value.converter.schemas.enable" : "false", "value.converter" : "org.apache.kafka.connect.storage.StringConverter", "value.converter.schemas.enable" : "false", "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable" : "false"}}' http://simple-connect-cluster-connect-api.amq-streams.svc:8083/connectors

    Here is the output:

    {"name":"redhat-file-sink-demo","config":{"connector.class":"FileStreamSinkConnector","tasks.max":"1","file":"/tmp/amq-demo-sink.txt","topics":"redhat-demo-topics","value.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.storage.StringConverter","key.converter":"org.apache.kafka.connect.storage.StringConverter","key.converter.schemas.enable":"false","name":"redhat-file-sink-demo"},"tasks":[],"type":"sink"}
    1. Create a new config that will source into the redhat-demo-topics topic content from the file amq-demo-source.txt:
    $ oc rsh simple-cluster-kafka-0
    sh-4.2$ curl -X POST -H "Content-Type: application/json" --data '{"name": "redhat-file-source-demo", "config": {"connector.class":"FileStreamSourceConnector", "tasks.max":"1", "file":"/tmp/amq-demo-source.txt", "topic":"redhat-demo-topics", "value.converter.schemas.enable" : "false", "value.converter" : "org.apache.kafka.connect.storage.StringConverter", "value.converter.schemas.enable" : "false", "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable" : "false"}}' http://simple-connect-cluster-connect-api.amq-streams.svc:8083/connectors

    Here is the output:

    {"name":"redhat-file-source-demo","config":{"connector.class":"FileStreamSourceConnector","tasks.max":"1","file":"/tmp/amq-demo-source.txt","topic":"redhat-demo-topics","value.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.storage.StringConverter","key.converter":"org.apache.kafka.connect.storage.StringConverter","key.converter.schemas.enable":"false","name":"redhat-file-source-demo"},"tasks":[],"type":"source"}
    1. In a new terminal start the producer sample app:
    $ oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list simple-cluster-kafka-bootstrap:9092 --topic redhat-demo-topics
    1. In a new terminal start the consumer sample app:
    $ oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server simple-cluster-kafka-bootstrap:9092 --topic redhat-demo-topics --from-beginning

    Testing your new Kafka Connect instance

    To test your instance, do the following:

    1. Log into the Kafka Connect pod and watch the /tmp/amq-demo-sink.txt file:
    $ oc get po | grep connect
    simple-connect-cluster-connect-7479b86c7-tmdbp 1/1 Running 0 3h
    $ oc rsh simple-connect-cluster-connect-7479b86c7-tmdbp
    sh-4.2$ tail -100f /tmp/amq-demo-sink.txt
    
    hello world
    from pramod

    Here, you can see that the connector has already sunk two messages into the file.

    1.  Log into the Kafka Connect pod in a different terminal, then add content into /tmp/amq-demo-source.txt:
    $ oc rsh simple-connect-cluster-connect-7479b86c7-tmdbp
    sh-4.2$ echo redhat-is-my-world > /tmp/amq-demo-source.txt

    This set of commands writes a message in /tmp/amq-demo-sink.txt, and also to the consumer sample app. Figure 3 shows the connector source push and sink output:

    The connector's push and sink output.
    Figure 3: The connector source pushing the message redhat-is-my-world in the second terminal.

    Figure 4 shows the sample app consuming the message:

    The sample app consuming the message.
    Figure 4: The results on the consuming side.
    1. Now, send a message from the producer sample app. Figure 5 shows how this message flows through the system in three parts. From top to bottom, these are the connector sink, the consumer sample app, and then the producer sample app pushing the message kafka connect sample app:
    The message flowing through each step.
    Figure 5: The message kafka connect sample app passing through each stage.

    Kafka Bridge

    The Bridge component helps us connect to the Kafka Cluster using the HTTP or AMQP protocol. In this article, we demo the HTTP usage as shown in Figure 6:

    Our structure including the Kafka Bridge.
    Figure 6: How the Kafka Bridge fits in through the HTTP protocol.

    Kafka Bridge produces a REST API for the HTTP protocol, through which it provides multiple operations, such as:

    • Sending messages.
    • Subscribing to topics.
    • Receiving messages.
    • Committing offsets.
    • Seeking specific positions.

    Creating your Kafka Bridge

    1. Create the kafka-bridge config file amq-kafka-bridge.yml. The example file present in examples/kafka-bridge/kafka-bridge.yaml was used as a reference for the following config:
    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaBridge
    metadata:
      name: simple-bridge
    spec:
      replicas: 1
      bootstrapServers: simple-cluster-kafka-bootstrap:9092
      http:
        port: 8080
    1. Create the bridge in OCP:
    $ oc create -f amq-kafka-bridge.yml

    You can see the results in Figure 7:

    The Kafka Bridge is deployed.
    Figure 7: Your new Kafka Bridge.
    1. Create a route so that we can access the bridge from outside the cluster:
    $ oc expose svc simple-bridge-bridge-service --name=simple-bridge-route

    Testing your HTTP protocol-based Kafka Bridge

    1. Create the Kafka topic config amq-kafka-topic.yml and apply it to the cluster:
    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaTopic
    metadata:
      name: simple-topic
      labels:
        strimzi.io/cluster: simple-cluster
    spec:
      partitions: 5
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    oc create -f amq-kafka-topic.yml
    1. Get the route URL for the bridge endpoints:
    #get the route to do the curl command
    
    oc get route
    
    NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD
    simple-bridge-route simple-bridge-route-amq-streams.apps.redhat.demo.com simple-bridge-bridge-service rest-api None
    1. Publish a message on simple-topic:
    curl -X POST \
    http://simple-bridge-route-amq-streams.apps.redhat.demo.com/topics/simple-topic \
    -H 'content-type: application/vnd.kafka.json.v2+json' \
    -d '{
    "records": [
    {
    "value": "all hail the shadowman"
    }
    ]
    }'

    Here is the output:

    {"offsets":[{"partition":0,"offset":0}]}
    1. Create the consumer group simple-rh-bridge-consumer-group and the instance simple-rh-bridge-consumer. For this task, we set the message format to JSON:
    curl -X POST \
    http://simple-bridge-route-amq-streams.apps.redhat.demo.com/consumers/simple-rh-bridge-consumer-group \
    -H 'content-type: application/vnd.kafka.v2+json' \
    -d '{
    "name": "simple-rh-bridge-consumer",
    "auto.offset.reset": "earliest",
    "format": "json",
    "enable.auto.commit": false,
    "fetch.min.bytes": 512,
    "consumer.request.timeout.ms": 30000
    }'
    1. Create a subscriber for the simple-topic created in step one:
    curl -X POST http://simple-bridge-route-amq-streams.apps.redhat.demo.com/consumers/simple-rh-bridge-consumer-group/instances/simple-rh-bridge-consumer/subscription \
    -H 'content-type: application/vnd.kafka.v2+json' \
    -d '{
    "topics": [
    "simple-topic"
    ]
    }'
    1. Consume the messages (note that the first request will register and the subsequent calls will provide the array of messages):
    curl -X GET http://simple-bridge-route-amq-streams.apps.redhat.demo.com/consumers/simple-rh-bridge-consumer-group/instances/simple-rh-bridge-consumer/records \
    -H 'accept: application/vnd.kafka.json.v2+json'

    Here is the output:

    []
    curl -X GET http://simple-bridge-route-amq-streams.apps.redhat.demo.com/consumers/simple-rh-bridge-consumer-group/instances/simple-rh-bridge-consumer/records \
    -H 'accept: application/vnd.kafka.json.v2+json'

    And the additional output:

    [{"topic":"simple-topic","key":null,"value":"all hail the shadowman","partition":0,"offset":0}]

    Mirror Maker

    Kafka Mirror Maker replicates data from one Kafka cluster to another. The usual use case is across different data centers.

    For the purpose of this demo, we use two different namespaces and projects, namely amq-streams and amq-streams-dc2. Doing so is the same as having multiple data centers with the same names. This setup is shown in Figure 8:

    Our structure for replicating multiple data centers.
    Figure 8: Replicating multiple data centers with Kafka Mirror Maker.

    Kafka Mirror maker consumes from the active Kafka cluster and produces to the mirror (backup) Kafka cluster.

    Setting up for the example

    To demo the Kafka Mirror Maker, we need to create another namespace and Kafka cluster. First, create the new namespace amq-streams-dc2:

    $ oc new-project amq-streams-dc2

    Next, create a new Kafka cluster:

    $ sed -i 's/namespace: .*/namespace: amq-streams-dc2/' install/cluster-operator/*RoleBinding*.yaml

    On macOS, use the following instead:

    $ sed -i '' 's/namespace: .*/namespace: amq-streams-dc2/' install/cluster-operator/*RoleBinding*.yaml
    $ oc apply -f install/cluster-operator -n amq-streams-dc2
    $ oc apply -f amq-kafka-cluster.yml -n amq-streams-dc2

    You can see the result in Figure 9:

    All of the components installed so far.
    Figure 9: All of the pieces are in place so far.

    Creating a mirror with Kafka Mirror Maker

    Create a kafka-mirror-maker config amq-kafka-mirror-maker.yml. In this file, we increase the consumer stream to two for faster response and use the group ID simple-source-group-id for the consumer. Additionally, we whitelist all of the topics using wildcards. This example file present in examples/kafka-mirror-maker/kafka-mirror-maker.yaml was used as a reference for the config:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaMirrorMaker
    metadata:
      name: simple-mirror-maker
    spec:
      version: 2.3.0
      replicas: 1
      consumer:
        bootstrapServers: simple-cluster-kafka-bootstrap:9092
        groupId: simple-source-group-id
        numStreams: 2
      producer:
        bootstrapServers: simple-cluster-kafka-bootstrap.amq-streams-dc2.svc:9092
      whitelist: ".*"
    Create the Mirror Maker in the amq-streams namespace:
    $ oc create -f amq-kafka-mirror-maker.yml -n amq-streams

    You can see the result in Figure 10:

    Mirror Maker is deployed.
    Figure 10: Your new Mirror Maker instance.

    Testing Kafka Mirror Maker

    To test the Mirror Maker, create the following two namespaces: amq-streams and amq-streams-dc2. The amq-streams namespace will contain the producer sample app to produce new messages, and the consumer sample app to consume new messages. The amq-streams-dc2 namespace will contain the consumer sample app so it can consume new messages, so it can show that the messages are getting pushed to the DC2 cluster.

    1. Create a producer sample app and consumer sample app in the amq-streams namespace:
    $ oc project amq-streams
    $ oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list simple-cluster-kafka-bootstrap:9092 --topic redhat-demo-topics
    $ oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server simple-cluster-kafka-bootstrap:9092 --topic redhat-demo-topics
    1. Create a Consumer sample app in the amq-streams-dc2 namespace
    $ oc project amq-streams-dc2
    $ oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server simple-cluster-kafka-bootstrap:9092 --topic redhat-demo-topics
    1. Send a message from the producer sample app.

    Is Mirror Maker working? You should see the message in the consumer app in both the namespaces, as shown in Figure 11:

    The message flowing through the components.
    Figure 11: Your message flowing from the producer to both the consumer and the backup consumer.

    Conclusion

    In this article, we explored Red Hat AMQ Streams components like Kafka Connect, Kafka Bridge, and Mirror Maker. In the third and final part of the series, we will cover monitoring and administration.

    References

    • Kafka Connect
    • Using AMQ Streams on OpenShift
    • AMQP Kafka Demo
    • Using Strimzi (latest)
    Last updated: March 29, 2023

    Recent Posts

    • Tekton joins the CNCF as an incubating project

    • Federated identity across the hybrid cloud using zero trust workload identity manager

    • Confidential virtual machine storage attack scenarios

    • Introducing virtualization platform autopilot

    • Integrate zero trust workload identity manager with Red Hat OpenShift GitOps

    What’s up next?

     

    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Platforms

    • Red Hat AI
    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform
    • See all products

    Build

    • Developer Sandbox
    • Developer tools
    • Interactive tutorials
    • API catalog

    Quicklinks

    • Learning resources
    • E-books
    • Cheat sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site status dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit
    © 2026 Red Hat

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Chat Support

    Please log in with your Red Hat account to access chat support.