Thanks to changes in Apache Kafka 2.4.0, consumers are no longer required to connect to a leader replica to consume messages. In this article, I introduce you to Apache Kafka's new ReplicaSelector
interface and its customizable RackAwareReplicaSelector
. I'll briefly explain the benefits of the new rack-aware selector, then show you how to use it to more efficiently balance load across Amazon Web Services (AWS) availability zones.
For this example, we'll use Red Hat AMQ Streams with Red Hat OpenShift Container Platform 4.3, running on Amazon AWS.
Note: Both AMQ Streams 1.4.0 and Strimzi 0.16.0 implement the new ReplicaSelector
interface.
Overview of the Apache Kafka messaging system
Apache Kafka uses topics to send and receive messages. Every Kafka topic consists of one or more partitions, which act as shards. Thanks to the partitions, each topic can be split into smaller units, and these are scaled. When a Kafka producer sends a message, it uses a message key to decide which partition to use. If no one has set a message key, the producer uses round-robin instead. The consumer, for its part, selects the number of partitions it needs and consumes messages from them. The number of required partitions per consumer depends on how many consumers are running in the same consumer group.
Partitions are not the smallest unit in the Kafka broker. Each partition consists of one or more replicas. Replicas help to ensure availability by keeping a copy of the data and acting as a standby mechanism. In every partition, one replica is elected as the leader, and the remaining replicas are followers. When the leader is lost, a new replica will be elected and begin serving messages to producers and consumers.
New in Apache Kafka 2.4.0
It used to be that only the leader replica could receive messages from producers and send them to consumers. Follower replicas could only copy and store data from the leader. Figure 1 is a flow diagram showing this older messaging model.
Figure 1. Producing and consuming messages from the leader replica.">
Apache Kafka 2.4.0 implements recommendations from Kafka Improvement Proposal (KIP) 392, which significantly changes Kafka's messaging behavior. Consumers can now consume messages directly from follower replicas, and they no longer need to connect to the leader replica. Figure 2 shows this new data-flow model.
Figure 2. Consuming messages from follower replicas.">
The new ReplicaSelector interface
KIP-392 implements a new broker plugin interface, ReplicaSelector
, that lets you provide custom logic to determine which replica a consumer should use. You configure the plugin using the replica.selector.class
option. Kafka provides two selector implementations. LeaderSelector
always selects the leader replica, so messaging works exactly like it did before. LeaderSelector
is also the default selector, so by default Kafka 2.4.0's messaging behavior is the same as in previous versions.
RackAwareReplicaSelector
attempts to use the rack ID to select the replica that is closest to the consumer. You also have the option to provide a custom selector implementing your own logic; however, fetching from the closest replica is the most common use case, so we'll focus on that.
Finding the closest replica
Kafka already allows you to balance replicas across racks. You can use the broker.rack
configuration to assign each broker a rack ID. Brokers will then try to spread replicas across as many racks as possible. This feature improves resiliency against rack failures. It's named after physical server racks, but most environments have a similar concept. Both AMQ Streams and Strimzi have supported rack awareness for a long time.
For developers familiar with running Apache Kafka on Amazon Web Services, AWS availability zones correspond to racks. Figure 3 shows a configuration where all the brokers are running in the availability zone us-east-1a
. In this case, the rack configuration would be broker.rack=us-east-1a
.
Figure 3. Consuming from replicas in the same AWS availability zone.">
Configuring the consumer
In the same way that you do for brokers, you can configure the new client.rack
to assign a rack ID for consumers. When it is enabled, RackAwareReplicaSelector
tries to match the consumer's client.rack
with available broker.rack
s. It then selects the replica that has the same rack ID as the client.
If there are multiple replicas in the same rack, RackAwareReplicaSelector
always selects the most up-to-date replica. If the rack ID is not specified, or if it can't find a replica with the same rack ID, it will fall back to the leader replica.
Using the rack-aware selector in AMQ Streams
Let's run an example to see how the new rack-aware selector works in practice. We'll use AMQ Streams with Red Hat OpenShift Container Platform 4.3, running on Amazon AWS.
Step 1: Deploy an Apache Kafka cluster
The first thing we'll do is to deploy an Apache Kafka cluster, as shown here:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: version: 2.4.0 replicas: 3 listeners: plain: {} tls: {} rack: topologyKey: failure-domain.beta.kubernetes.io/zone config: replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector auto.create.topics.enable: "false" offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 log.message.format.version: "2.4" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false zookeeper: replicas: 3 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}
Step 2: Enable the rack-aware selector
Next, we enable the rack-aware selector. There are two parts to this. First set the rack
configuration to use the availability zone where the broker is running. Enable the rack-awareness
feature by configuring it to use the failure-domain.beta.kubernetes.io/zone
label from the correct Kubernetes node.
Next, activate the rack-aware selector by configuring it to use the replica.selector.class
option and set it to org.apache.kafka.common.replica.RackAwareReplicaSelector
.
Step 3: Create a topic with three replicas
The Kafka cluster has three nodes. We've used the rack-awareness feature to configure the corresponding Kubernetes affinity rule, which ensures the nodes are distributed across our availability zones. Now we can create a topic with three replicas:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 3
Step 4: Verify the configuration
Next, we use the kafka-topcis.sh
tool to verify that the topic was created and that the leaders for the three partitions are distributed across all three brokers:
$ bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --describe --topic my-topic Topic:my-topic PartitionCount:3 ReplicationFactor:3 Configs:message.format.version=2.3-IV1 Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: my-topic Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: my-topic Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Step 5: Configure the producer and consumer
The broker and topic are ready, so now we need a producer and consumer. The producer doesn't play a role here, so we could use the Kafka console producer or any other producer to send the messages. In this case, we'll use the Kafka producer:
$ bin/kafka-console-producer.sh --broker-list=my-cluster-kafka-bootstrap:9092 --topic my-topic
The client.rack
option defines which zone the client will use to consume messages. In the Kafka console consumer, we use the --consumer-property
option to specify a client.rack
:
$ bin/kafka-console-consumer.sh --bootstrap-server=my-cluster-kafka-bootstrap:9092 --topic my-topic --consumer-property client.rack=eu-west-1c
Note that the --consumer-property
option is not limited to the console consumer. You can use it with any Java client or with the client for any other language that supports this feature. You just have to specify the client.rack
when configuring the consumer:
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ”my-cluster-kafka-bootstrap:9092”); props.put(ConsumerConfig.CLIENT_RACK_CONFIG, ”eu-west-1c”);
Step 6: Set the log level to debug
On first look, the client is consuming messages just like any other consumer would. So how can we be sure that it's consuming from our follower replicas? We need to set the log level for the org.apache.kafka.clients.consumer.internals.Fetcher
class to DEBUG
.
The DEBUG
setting gives us more detailed information when fetching messages from the broker. With the client.rack
specified, we can now see logs like this:
[2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=789, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher) [2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-2 at position FetchPosition{offset=724, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher) [2020-02-24 21:39:53,264] DEBUG [Consumer clientId=consumer-console-consumer-47576-1, groupId=console-consumer-47576] Added READ_UNCOMMITTED fetch request for partition my-topic-1 at position FetchPosition{offset=760, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher)
This log assures us that the client is fetching messages from the three different partitions we've set for our topic. The leader for this partition is always a different broker, but the fetch request is always sent to my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092
. In my case, that is the broker running in the eu-west-1c
zone.
Log comparison
You might wonder how this configuration would look if we didn't specify the client.rack
option. Let's see what happens if we just run:
$ bin/kafka-console-consumer.sh --bootstrap-server=my-cluster-kafka-bootstrap:9092 --topic my-topic
In this case, Kafka will send the fetch request to the elected leader for each partition:
[2020-02-24 21:44:49,952] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-2 at position FetchPosition{offset=823, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a), epoch=0}} to node my-cluster-kafka-2.my-cluster-kafka-brokers.myproject.svc:9092 (id: 2 rack: eu-west-1a) (org.apache.kafka.clients.consumer.internals.Fetcher) [2020-02-24 21:44:50,092] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-1 at position FetchPosition{offset=856, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c), epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.myproject.svc:9092 (id: 0 rack: eu-west-1c) (org.apache.kafka.clients.consumer.internals.Fetcher) [2020-02-24 21:44:50,435] DEBUG [Consumer clientId=consumer-console-consumer-94348-1, groupId=console-consumer-94348] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=891, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b), epoch=0}} to node my-cluster-kafka-1.my-cluster-kafka-brokers.myproject.svc:9092 (id: 1 rack: eu-west-1b) (org.apache.kafka.clients.consumer.internals.Fetcher)
Is the rack-aware selector right for your use case?
The new RackAwareReplicaSelector
is not for everyone. Depending on your configuration, the follower replicas might not be as up-to-date as the leader replica. In that case, the latency between the producer and the consumer will be higher when fetching from followers rather than a leader. The KIP-392 proposal describes some of these situations in detail.
The new selector does offer a few clear advantages, however. Apache Kafka is a high-throughput messaging platform, which can create a significant load on your networks. That load is shared between brokers and between brokers and clients. Consuming messages from the closest replica could help reduce the load on your network.
The rack-aware selector also has potential cost benefits for public cloud environments. In AWS, for example, data transfers are free between brokers and clients in the same availability zone. Data transfers across different availability zones are billed. It's fairly common to work with large amounts of data in Apache Kafka, so data transfer fees can add up. Consuming from follower replicas in the same availability zone can help to keep those costs down. In use cases where you are replaying older data—such as to train or validate artificial intelligence and machine learning models—the latency will not matter, while the cost savings could be significant.
Conclusion
Does the new rack-awareness feature seem like a good fit for your project? Give it a try! The rack-awareness selector is available in Apache Kafka 2.4.0, starting with AMQ Streams 1.4.0 and Strimzi 0.16.
Last updated: February 5, 2024