Consuming messages from closest replicas in Apache Kafka 2.4.0 and AMQ Streams

Consuming messages from closest replicas in Apache Kafka 2.4.0 and AMQ Streams

Thanks to changes in Apache Kafka 2.4.0, consumers are no longer required to connect to a leader replica to consume messages. In this article, I introduce you to Apache Kafka’s new ReplicaSelector interface and its customizable RackAwareReplicaSelector. I’ll briefly explain the benefits of the new rack-aware selector, then show you how to use it to more efficiently balance load across Amazon Web Services (AWS) availability zones.

For this example, we’ll use Red Hat AMQ Streams with Red Hat OpenShift Container Platform 4.3, running on Amazon AWS.


Note: Both AMQ Streams 1.4.0 and Strimzi 0.16.0 implement the new ReplicaSelector interface.

Overview of the Apache Kafka messaging system

Apache Kafka uses topics to send and receive messages. Every Kafka topic consists of one or more partitions, which act as shards. Thanks to the partitions, each topic can be split into smaller units, and these are scaled. When a Kafka producer sends a message, it uses a message key to decide which partition to use. If no one has set a message key, the producer uses round-robin instead. The consumer, for its part, selects the number of partitions it needs and consumes messages from them. The number of required partitions per consumer depends on how many consumers are running in the same consumer group.

Partitions are not the smallest unit in the Kafka broker. Each partition consists of one or more replicas. Replicas help to ensure availability by keeping a copy of the data and acting as a standby mechanism. In every partition, one replica is elected as the leader, and the remaining replicas are followers. When the leader is lost, a new replica will be elected and begin serving messages to producers and consumers.

Everything you need to grow your career.

With your free Red Hat Developer program membership, unlock our library of cheat sheets and ebooks on next-generation application development.

SIGN UP

New in Apache Kafka 2.4.0

It used to be that only the leader replica could receive messages from producers and send them to consumers. Follower replicas could only copy and store data from the leader. Figure 1 is a flow diagram showing this older messaging model.

A flow diagram of the Kafka message system before KIP-392.

Figure 1. Producing and consuming messages from the leader replica.

Apache Kafka 2.4.0 implements recommendations from Kafka Improvement Proposal (KIP) 392, which significantly changes Kafka’s messaging behavior. Consumers can now consume messages directly from follower replicas, and they no longer need to connect to the leader replica. Figure 2 shows this new data-flow model.

A flow diagram of the new messaging model.

Figure 2. Consuming messages from follower replicas.

The new ReplicaSelector interface

KIP-392 implements a new broker plugin interface, ReplicaSelector, that lets you provide custom logic to determine which replica a consumer should use. You configure the plugin using the replica.selector.class option. Kafka provides two selector implementations. LeaderSelector always selects the leader replica, so messaging works exactly like it did before. LeaderSelector is also the default selector, so by default Kafka 2.4.0’s messaging behavior is the same as in previous versions.

RackAwareReplicaSelector attempts to use the rack ID to select the replica that is closest to the consumer. You also have the option to provide a custom selector implementing your own logic; however, fetching from the closest replica is the most common use case, so we’ll focus on that.

Finding the closest replica

Kafka already allows you to balance replicas across racks. You can use the broker.rack configuration to assign each broker a rack ID. Brokers will then try to spread replicas across as many racks as possible. This feature improves resiliency against rack failures. It’s named after physical server racks, but most environments have a similar concept. Both AMQ Streams and Strimzi have supported rack awareness for a long time.

For developers familiar with running Apache Kafka on Amazon Web Services, AWS availability zones correspond to racks. Figure 3 shows a configuration where all the brokers are running in the availability zone us-east-1a. In this case, the rack configuration would be broker.rack=us-east-1a.

Flow diagram for an AWS configuration where all the brokers are running in the availability zone us-east-1a.

Figure 3. Consuming from replicas in the same AWS availability zone.

Configuring the consumer

In the same way that you do for brokers, you can configure the new client.rack to assign a rack ID for consumers. When it is enabled, RackAwareReplicaSelector tries to match the consumer’s client.rack with available broker.racks. It then selects the replica that has the same rack ID as the client.

If there are multiple replicas in the same rack, RackAwareReplicaSelector always selects the most up-to-date replica. If the rack ID is not specified, or if it can’t find a replica with the same rack ID, it will fall back to the leader replica.

Using the rack-aware selector in AMQ Streams

Let’s run an example to see how the new rack-aware selector works in practice. We’ll use AMQ Streams with Red Hat OpenShift Container Platform 4.3, running on Amazon AWS.

Step 1: Deploy an Apache Kafka cluster

The first thing we’ll do is to deploy an Apache Kafka cluster, as shown here:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
    name: my-cluster
spec:
    kafka:
        version: 2.4.0
        replicas: 3
        listeners:
            plain: {}
            tls: {}
        rack:
            topologyKey: failure-domain.beta.kubernetes.io/zone
        config:
            replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector
            auto.create.topics.enable: "false"
            offsets.topic.replication.factor: 3
            transaction.state.log.replication.factor: 3
            transaction.state.log.min.isr: 2
            log.message.format.version: "2.4"
        storage:
            type: jbod
            volumes:
              - id: 0
                type: persistent-claim
                size: 100Gi
                deleteClaim: false
    zookeeper:
        replicas: 3
        storage:
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
    entityOperator:
        topicOperator: {}
        userOperator: {}

Step 2: Enable the rack-aware selector

Next, we enable the rack-aware selector. There are two parts to this. First set the rack configuration to use the availability zone where the broker is running. Enable the rack-awareness feature by configuring it to use the failure-domain.beta.kubernetes.io/zone label from the correct Kubernetes node.

Next, activate the rack-aware selector by configuring it to use the replica.selector.class option and set it to org.apache.kafka.common.replica.RackAwareReplicaSelector.

Step 3: Create a topic with three replicas

The Kafka cluster has three nodes. We’ve used the rack-awareness feature to configure the corresponding Kubernetes affinity rule, which ensures the nodes are distributed across our availability zones. Now we can create a topic with three replicas:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
    name: my-topic
    labels:
        strimzi.io/cluster: my-cluster
spec:
    partitions: 3
    replicas: 3

Step 4: Verify the configuration

Next, we use the kafka-topcis.sh tool to verify that the topic was created and that the leaders for the three partitions are distributed across all three brokers:

$ bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --describe --topic my-topic
Topic:my-topic  PartitionCount:3        ReplicationFactor:3 Configs:message.format.version=2.3-IV1
        Topic: my-topic Partition: 0    Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
        Topic: my-topic Partition: 1    Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
        Topic: my-topic Partition: 2    Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

Step 5: Configure the producer and consumer

The broker and topic are ready, so now we need a producer and consumer. The producer doesn’t play a role here, so we could use the Kafka console producer or any other producer to send the messages. In this case, we’ll use the Kafka producer:

$ bin/kafka-console-producer.sh --broker-list=my-cluster-kafka-bootstrap:9092 --topic my-topic

The client.rack option defines which zone the client will use to consume messages. In the Kafka console consumer, we use the --consumer-property option to specify a client.rack:

$ bin/kafka-console-consumer.sh --bootstrap-server=my-cluster-kafka-bootstrap:9092 --topic my-topic --consumer-property client.rack=eu-west-1c

Note that the --consumer-property option is not limited to the console consumer. You can use it with any Java client or with the client for any other language that supports this feature. You just have to specify the client.rack when configuring the consumer:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ”my-cluster-kafka-bootstrap:9092”);
props.put(ConsumerConfig.CLIENT_RACK_CONFIG, ”eu-west-1c”);

Step 6: Set the log level to debug

On first look, the client is consuming messages just like any other consumer would. So how can we be sure that it’s consuming from our follower replicas? We need to set the log level for the org.apache.kafka.clients.consumer.internals.Fetcher class to DEBUG.

The DEBUG setting gives us more detailed information when fetching messages from the broker. With the client.rack specified, we can now see logs like this:

[2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=789, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)
[2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-2 at position FetchPosition{offset=724, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)
[2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-1 at position FetchPosition{offset=760, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)

This log assures us that the client is fetching messages from the three different partitions we’ve set for our topic. The leader for this partition is always a different broker, but the fetch request is always sent to my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092. In my case, that is the broker running in the eu-west-1c zone.

Log comparison

You might wonder how this configuration would look if we didn’t specify the client.rack option. Let’s see what happens if we just run:

$ bin/kafka-console-consumer.sh --bootstrap-server=my-cluster-kafka-bootstrap:9092 --topic my-topic

In this case, Kafka will send the fetch request to the elected leader for each partition:

[2020-02-24 21:44:49,952] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-2 at position FetchPosition{offset=823, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a), epoch=0}} to node my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a) (org.apache.kafka.clients.consumer.internals.Fetcher)
[2020-02-24 21:44:50,092] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-1 at position FetchPosition{offset=856, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)
[2020-02-24 21:44:50,435] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=891, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b), epoch=0}} to node my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b) (org.apache.kafka.clients.consumer.internals.Fetcher)

Is the rack-aware selector right for your use case?

The new RackAwareReplicaSelector is not for everyone. Depending on your configuration, the follower replicas might not be as up-to-date as the leader replica. In that case, the latency between the producer and the consumer will be higher when fetching from followers rather than a leader. The KIP-392 proposal describes some of these situations in detail.

The new selector does offer a few clear advantages, however. Apache Kafka is a high-throughput messaging platform, which can create a significant load on your networks. That load is shared between brokers and between brokers and clients. Consuming messages from the closest replica could help reduce the load on your network.

The rack-aware selector also has potential cost benefits for public cloud environments. In AWS, for example, data transfers are free between brokers and clients in the same availability zone. Data transfers across different availability zones are billed. It’s fairly common to work with large amounts of data in Apache Kafka, so data transfer fees can add up. Consuming from follower replicas in the same availability zone can help to keep those costs down. In use cases where you are replaying older data—such as to train or validate artificial intelligence and machine learning models—the latency will not matter, while the cost savings could be significant.

Conclusion

Does the new rack-awareness feature seem like a good fit for your project? Give it a try! The rack-awareness selector is available in Apache Kafka 2.4.0, starting with AMQ Streams 1.4.0 and Strimzi 0.16.

Share