Skip to main content
Redhat Developers  Logo
  • Products

    Featured

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat OpenShift AI
      Red Hat OpenShift AI
    • Red Hat Enterprise Linux AI
      Linux icon inside of a brain
    • Image mode for Red Hat Enterprise Linux
      RHEL image mode
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • Red Hat Developer Hub
      Developer Hub
    • View All Red Hat Products
    • Linux

      • Red Hat Enterprise Linux
      • Image mode for Red Hat Enterprise Linux
      • Red Hat Universal Base Images (UBI)
    • Java runtimes & frameworks

      • JBoss Enterprise Application Platform
      • Red Hat build of OpenJDK
    • Kubernetes

      • Red Hat OpenShift
      • Microsoft Azure Red Hat OpenShift
      • Red Hat OpenShift Virtualization
      • Red Hat OpenShift Lightspeed
    • Integration & App Connectivity

      • Red Hat Build of Apache Camel
      • Red Hat Service Interconnect
      • Red Hat Connectivity Link
    • AI/ML

      • Red Hat OpenShift AI
      • Red Hat Enterprise Linux AI
    • Automation

      • Red Hat Ansible Automation Platform
      • Red Hat Ansible Lightspeed
    • Developer tools

      • Red Hat Trusted Software Supply Chain
      • Podman Desktop
      • Red Hat OpenShift Dev Spaces
    • Developer Sandbox

      Developer Sandbox
      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Secure Development & Architectures

      • Security
      • Secure coding
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
      • View All Technologies
    • Start exploring in the Developer Sandbox for free

      sandbox graphic
      Try Red Hat's products and technologies without setup or configuration.
    • Try at no cost
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • Java
      Java icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • API Catalog
    • Product Documentation
    • Legacy Documentation
    • Red Hat Learning

      Learning image
      Boost your technical skills to expert-level with the help of interactive lessons offered by various Red Hat Learning programs.
    • Explore Red Hat Learning
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

Consuming messages from closest replicas in Apache Kafka 2.4.0 and AMQ Streams

April 29, 2020
Jakub Scholz
Related topics:
DevOpsEvent-DrivenJava
Related products:
Streams for Apache Kafka

Share:

    Thanks to changes in Apache Kafka 2.4.0, consumers are no longer required to connect to a leader replica to consume messages. In this article, I introduce you to Apache Kafka's new ReplicaSelector interface and its customizable RackAwareReplicaSelector. I'll briefly explain the benefits of the new rack-aware selector, then show you how to use it to more efficiently balance load across Amazon Web Services (AWS) availability zones.

    For this example, we'll use Red Hat AMQ Streams with Red Hat OpenShift Container Platform 4.3, running on Amazon AWS.

    Note: Both AMQ Streams 1.4.0 and Strimzi 0.16.0 implement the new ReplicaSelector interface.

    Overview of the Apache Kafka messaging system

    Apache Kafka uses topics to send and receive messages. Every Kafka topic consists of one or more partitions, which act as shards. Thanks to the partitions, each topic can be split into smaller units, and these are scaled. When a Kafka producer sends a message, it uses a message key to decide which partition to use. If no one has set a message key, the producer uses round-robin instead. The consumer, for its part, selects the number of partitions it needs and consumes messages from them. The number of required partitions per consumer depends on how many consumers are running in the same consumer group.

    Partitions are not the smallest unit in the Kafka broker. Each partition consists of one or more replicas. Replicas help to ensure availability by keeping a copy of the data and acting as a standby mechanism. In every partition, one replica is elected as the leader, and the remaining replicas are followers. When the leader is lost, a new replica will be elected and begin serving messages to producers and consumers.

    New in Apache Kafka 2.4.0

    It used to be that only the leader replica could receive messages from producers and send them to consumers. Follower replicas could only copy and store data from the leader. Figure 1 is a flow diagram showing this older messaging model.

    Before-KIP-392
    Producing and consuming messages from leader replica only

    Figure 1. Producing and consuming messages from the leader replica.">

    Apache Kafka 2.4.0 implements recommendations from Kafka Improvement Proposal (KIP) 392, which significantly changes Kafka's messaging behavior. Consumers can now consume messages directly from follower replicas, and they no longer need to connect to the leader replica. Figure 2 shows this new data-flow model.

    A flow diagram of the new messaging model.
    Consuming messages from follower replicas

    Figure 2. Consuming messages from follower replicas.">

    The new ReplicaSelector interface

    KIP-392 implements a new broker plugin interface, ReplicaSelector, that lets you provide custom logic to determine which replica a consumer should use. You configure the plugin using the replica.selector.class option. Kafka provides two selector implementations. LeaderSelector always selects the leader replica, so messaging works exactly like it did before. LeaderSelector is also the default selector, so by default Kafka 2.4.0's messaging behavior is the same as in previous versions.

    RackAwareReplicaSelector attempts to use the rack ID to select the replica that is closest to the consumer. You also have the option to provide a custom selector implementing your own logic; however, fetching from the closest replica is the most common use case, so we'll focus on that.

    Finding the closest replica

    Kafka already allows you to balance replicas across racks. You can use the broker.rack configuration to assign each broker a rack ID. Brokers will then try to spread replicas across as many racks as possible. This feature improves resiliency against rack failures. It's named after physical server racks, but most environments have a similar concept. Both AMQ Streams and Strimzi have supported rack awareness for a long time.

    For developers familiar with running Apache Kafka on Amazon Web Services, AWS availability zones correspond to racks. Figure 3 shows a configuration where all the brokers are running in the availability zone us-east-1a. In this case, the rack configuration would be broker.rack=us-east-1a.

    KIP-392 in AWS
    Consuming from replicas in the same AWS Availability Zone

    Figure 3. Consuming from replicas in the same AWS availability zone.">

    Configuring the consumer

    In the same way that you do for brokers, you can configure the new client.rack to assign a rack ID for consumers. When it is enabled, RackAwareReplicaSelector tries to match the consumer's client.rack with available broker.racks. It then selects the replica that has the same rack ID as the client.

    If there are multiple replicas in the same rack, RackAwareReplicaSelector always selects the most up-to-date replica. If the rack ID is not specified, or if it can't find a replica with the same rack ID, it will fall back to the leader replica.

    Using the rack-aware selector in AMQ Streams

    Let's run an example to see how the new rack-aware selector works in practice. We'll use AMQ Streams with Red Hat OpenShift Container Platform 4.3, running on Amazon AWS.

    Step 1: Deploy an Apache Kafka cluster

    The first thing we'll do is to deploy an Apache Kafka cluster, as shown here:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
        name: my-cluster
    spec:
        kafka:
            version: 2.4.0
            replicas: 3
            listeners:
                plain: {}
                tls: {}
            rack:
                topologyKey: failure-domain.beta.kubernetes.io/zone
            config:
                replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector
                auto.create.topics.enable: "false"
                offsets.topic.replication.factor: 3
                transaction.state.log.replication.factor: 3
                transaction.state.log.min.isr: 2
                log.message.format.version: "2.4"
            storage:
                type: jbod
                volumes:
                  - id: 0
                    type: persistent-claim
                    size: 100Gi
                    deleteClaim: false
        zookeeper:
            replicas: 3
            storage:
                type: persistent-claim
                size: 100Gi
                deleteClaim: false
        entityOperator:
            topicOperator: {}
            userOperator: {}
    

    Step 2: Enable the rack-aware selector

    Next, we enable the rack-aware selector. There are two parts to this. First set the rack configuration to use the availability zone where the broker is running. Enable the rack-awareness feature by configuring it to use the failure-domain.beta.kubernetes.io/zone label from the correct Kubernetes node.

    Next, activate the rack-aware selector by configuring it to use the replica.selector.class option and set it to org.apache.kafka.common.replica.RackAwareReplicaSelector.

    Step 3: Create a topic with three replicas

    The Kafka cluster has three nodes. We've used the rack-awareness feature to configure the corresponding Kubernetes affinity rule, which ensures the nodes are distributed across our availability zones. Now we can create a topic with three replicas:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaTopic
    metadata:
        name: my-topic
        labels:
            strimzi.io/cluster: my-cluster
    spec:
        partitions: 3
        replicas: 3
    

    Step 4: Verify the configuration

    Next, we use the kafka-topcis.sh tool to verify that the topic was created and that the leaders for the three partitions are distributed across all three brokers:

    $ bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --describe --topic my-topic
    Topic:my-topic  PartitionCount:3        ReplicationFactor:3 Configs:message.format.version=2.3-IV1
            Topic: my-topic Partition: 0    Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
            Topic: my-topic Partition: 1    Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
            Topic: my-topic Partition: 2    Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
    

    Step 5: Configure the producer and consumer

    The broker and topic are ready, so now we need a producer and consumer. The producer doesn't play a role here, so we could use the Kafka console producer or any other producer to send the messages. In this case, we'll use the Kafka producer:

    $ bin/kafka-console-producer.sh --broker-list=my-cluster-kafka-bootstrap:9092 --topic my-topic
    

    The client.rack option defines which zone the client will use to consume messages. In the Kafka console consumer, we use the --consumer-property option to specify a client.rack:

    $ bin/kafka-console-consumer.sh --bootstrap-server=my-cluster-kafka-bootstrap:9092 --topic my-topic --consumer-property client.rack=eu-west-1c
    

    Note that the --consumer-property option is not limited to the console consumer. You can use it with any Java client or with the client for any other language that supports this feature. You just have to specify the client.rack when configuring the consumer:

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ”my-cluster-kafka-bootstrap:9092”);
    props.put(ConsumerConfig.CLIENT_RACK_CONFIG, ”eu-west-1c”);
    

    Step 6: Set the log level to debug

    On first look, the client is consuming messages just like any other consumer would. So how can we be sure that it's consuming from our follower replicas? We need to set the log level for the org.apache.kafka.clients.consumer.internals.Fetcher class to DEBUG.

    The DEBUG setting gives us more detailed information when fetching messages from the broker. With the client.rack specified, we can now see logs like this:

    [2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=789, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)
    [2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-2 at position FetchPosition{offset=724, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)
    [2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-1 at position FetchPosition{offset=760, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)
    

    This log assures us that the client is fetching messages from the three different partitions we've set for our topic. The leader for this partition is always a different broker, but the fetch request is always sent to my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092. In my case, that is the broker running in the eu-west-1c zone.

    Log comparison

    You might wonder how this configuration would look if we didn't specify the client.rack option. Let's see what happens if we just run:

    $ bin/kafka-console-consumer.sh --bootstrap-server=my-cluster-kafka-bootstrap:9092 --topic my-topic
    

    In this case, Kafka will send the fetch request to the elected leader for each partition:

    [2020-02-24 21:44:49,952] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-2 at position FetchPosition{offset=823, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a), epoch=0}} to node my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a) (org.apache.kafka.clients.consumer.internals.Fetcher)
    [2020-02-24 21:44:50,092] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-1 at position FetchPosition{offset=856, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)
    [2020-02-24 21:44:50,435] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=891, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b), epoch=0}} to node my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b) (org.apache.kafka.clients.consumer.internals.Fetcher)
    

    Is the rack-aware selector right for your use case?

    The new RackAwareReplicaSelector is not for everyone. Depending on your configuration, the follower replicas might not be as up-to-date as the leader replica. In that case, the latency between the producer and the consumer will be higher when fetching from followers rather than a leader. The KIP-392 proposal describes some of these situations in detail.

    The new selector does offer a few clear advantages, however. Apache Kafka is a high-throughput messaging platform, which can create a significant load on your networks. That load is shared between brokers and between brokers and clients. Consuming messages from the closest replica could help reduce the load on your network.

    The rack-aware selector also has potential cost benefits for public cloud environments. In AWS, for example, data transfers are free between brokers and clients in the same availability zone. Data transfers across different availability zones are billed. It's fairly common to work with large amounts of data in Apache Kafka, so data transfer fees can add up. Consuming from follower replicas in the same availability zone can help to keep those costs down. In use cases where you are replaying older data—such as to train or validate artificial intelligence and machine learning models—the latency will not matter, while the cost savings could be significant.

    Conclusion

    Does the new rack-awareness feature seem like a good fit for your project? Give it a try! The rack-awareness selector is available in Apache Kafka 2.4.0, starting with AMQ Streams 1.4.0 and Strimzi 0.16.

    Last updated: February 5, 2024

    Recent Posts

    • How to run a fraud detection AI model on RHEL CVMs

    • How we use software provenance at Red Hat

    • Alternatives to creating bootc images from scratch

    • How to update OpenStack Services on OpenShift

    • How to integrate vLLM inference into your macOS and iOS apps

    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue