This guide breaks down the key concepts of Apache Kafka's KRaft protocol and dives into its implementation, based on Apache Kafka 4.1.0 (KRaft v1). If you're a developer or engineer looking to solidify your understanding of how KRaft works under the hood, this post is for you. A basic understanding of Kafka is assumed. If you're already familiar with the Raft algorithm, you can skip ahead a few sections.
Consensus algorithms
Consensus algorithms are a cornerstone of modern distributed computing. They enable the creation of reliable, resilient, and trustworthy systems in a world where data and applications are increasingly decentralized.
The primary purpose of a consensus algorithm is to ensure fault tolerance and consistency in a network of interconnected nodes. This means that even if some nodes fail or provide conflicting information, the system as a whole can continue to operate correctly as long as a majority of nodes are available.
Introducing Raft
Voting-based consensus algorithms, such as Raft, rely on a quorum of participants to vote on the validity of a transaction or a batch of data. It basically defines a replicated state machine, where a number of nodes process the same sequence of inputs in the same order to stay synchronized.
Raft works by breaking down the consensus problem into three key parts: leader election, log replication, and safety rules. At any given time, each node is in one of three states: follower, candidate, leader.
Leader election
All nodes start as followers. Only one node can be the leader at any given time. The leader is responsible for managing the system, and all changes go through it. All the other nodes are followers.
If a follower doesn't hear from a leader for a certain period—the election timeout—it assumes the leader has crashed. At this point, it increments a counter for election cycles (called the term) transitions to the candidate state, and votes for itself. The candidate then sends RequestVote
RPCs to all other nodes in the cluster. Each follower votes for the first valid candidate (with an up-to-date log and a valid term). If the candidate receives votes from a majority of the nodes, it becomes the new leader.
If multiple candidates start an election at the same time, and none wins a majority, the election times out, the term is incremented, and a new election begins. Randomized timeouts help prevent these split votes from repeating.
Log replication
Once a leader is elected, it is responsible for replicating all changes across the cluster. These changes are stored in a sequential, ordered list called the log.
When a client wants to make a change, it sends the command to the leader. The leader appends the command to its own log as a new entry and then sends AppendEntries
RPCs to all followers, asking them to add the new entry to their logs. Once a majority of followers have successfully replicated the entry, the leader commits by applying the command to its own state machine and notifies followers that the entry is committed. Followers then apply the committed command to their own state machines, ensuring all servers stay in sync. This flow is illustrated in Figure 1.

At the start of its term, the leader does not know which entries were committed by the previous leader, so it commits a blank no-op entry into the log, which implies all previous entries are also committed, before replying to any read request. It also periodically sends heartbeats (AppendEntries
requests with no log entries) to prevent followers from starting new elections.
When a follower receives log entries from the leader, it first checks that the leader's term is at least as current as its own. It then verifies log consistency by ensuring the entry before the new ones matches what the leader expects using prevLogIndex
and prevLogTerm
fields. If there's a conflict where an existing entry has the same index but a different term than a new entry, the follower deletes that conflicting entry and all entries that follow it, then appends the leader's new entries. This process ensures all followers maintain identical logs that match the leader's.
Safety rules
Clients send all of their requests to the leader. These requests must be idempotent. Each command has an associated serial number that can be used to discard duplicates. If the receiving server is not the leader, it will reject the request, giving back information about the most recent leader it has heard from. The following consensus state must be synchronously persisted to stable storage to guarantee safety and fault-tolerance properties:
currentTerm
acts as a logical clock to identify stale leaders and candidates.votedFor
ensures that a node votes for at most one candidate in a given term.log
is the core of the replicated state machine.
The voting state is kept separate from the replicated state machine log for bootstrapping and performance reasons. In addition to that, a number of invariants are enforced to ensure nothing bad ever happens in the system:
- Leader election safety: At most one leader can be elected in a given term. If a candidate wins an election, it must have received votes from a majority of the nodes in the cluster. A node can only vote for one candidate in a term, so it's impossible for two different candidates to win a majority in the same term. This prevents a split-brain scenario where two leaders believe they are in charge.
- Append-only leader: A leader never overwrites or deletes entries in its log; it only appends new ones.
- Leader completeness: A newly elected leader has all committed entries from previous terms. A candidate can only win an election if its log is at least as up-to-date as the logs of the majority of nodes it receives votes from. This prevents a new leader from accidentally overwriting or ignoring entries that the cluster has already agreed upon.
- Log matching: Log entries are unique by index and term. If two logs contain an entry with the same index and term, then the logs are identical up to that index. When a leader sends an
AppendEntries
request to a follower, it includes the index and term of the entry immediately preceding the new ones. The follower will only accept the new entries if it has a matching entry at that index and term. This forces logs to converge and eliminates inconsistencies. - State machine safety: If a node has applied a log entry at a given index to its state machine, no other node will ever apply a different log entry for the same index.
Practical considerations
When implementing Raft, key challenges include handling network partitions gracefully through appropriate timeout configurations, managing log compaction to prevent unbounded growth, and optimizing leader election to minimize downtime. Additionally, careful attention must be paid to persistence guarantees, batching mechanisms for performance, and proper handling of configuration changes during cluster membership updates.
Timing
Leader election is the aspect of Raft where timing is most critical. Raft will be able to elect and maintain a steady leader as long as the system satisfies the following timing requirement:
broadcastTime
<< electionTimeout
<< avgTimeBetweenFailures
The broadcastTime
is the time to send RPCs to all nodes and receive responses (0.5–20 ms). The electionTimeout
is the time a follower waits before starting an election (150–300 ms). The avgTimeBetweenFailures
is measuring any node failure, including network partitions (months).
To optimize performance, the leader can execute log writes (fsync
calls) concurrently with network transmission of AppendEntries
requests, and commit as soon as it receives acknowledgments from a majority of nodes. Access to consensus state should be protected by one or more locks accessed by the various application threads. Further throughput gains and latency reductions are achievable by implementing batching and pipelining techniques for AppendEntries
requests.
Network partition
A single, isolated node can force a perfectly fine leader to step down, triggering a new and unnecessary election. This causes a brief period of unavailability where the cluster has no leader and cannot process requests. In the unfortunate scenario where two nodes can only connect to one other node, this can determine flip-flopping leadership changes.
The Pre-Vote
optimization prevents this issue by first checking if the candidate can get enough votes without bumping the term before starting a full leader election. Followers that have recently heard from the leader reject Pre-Vote
requests to avoid disrupting the quorum.
To prevent split-brain scenarios, we also need the Check Quorum
mechanism where the leader periodically verifies it can communicate with a majority of the followers and steps down otherwise.
Log compaction
The internal log needs to be periodically compacted to avoid availability problems caused by increased time to replay the log and disk space exhaustion. This is usually done using the snapshotting technique, where the entire system state is periodically written to a snapshot file on stable storage.
Nodes take a snapshot of their log independent of each other. Snapshots are consistent because the logs are consistent across nodes. The snapshot includes the last appended index and term to support the consistency check for the first log entry following the snapshot. It also includes the latest voter set to support dynamic scaling.
If the leader has already discarded the next log entry that it needs to send to a follower, it sends a snapshot. This can happen when there is a slow follower or a new server joining the cluster.
Cluster scaling
Once set, the total number of nodes (N) rarely needs to change. The ability to scale is mostly important for maintenance operations. One might need to replace a node with a failed disk, or to migrate all nodes to a new set of machines.
The number of nodes that can fail without causing data loss or system downtime is:
F = (N - 1) / 2
Adding more nodes increases the cluster's ability to tolerate failures, but it comes at a performance cost, especially for write operations. It is most efficient to scale a Raft cluster using an odd number of nodes (3, 5, 7, and so on). Adding a single node to an odd-sized cluster (for example, going from 3 to 4) increases the quorum size without increasing the number of tolerable failures, thereby only adding communication overhead.
To ensure safety, there must be no point in time where it is possible for two leaders to be elected for the same term (split-brain). It is impossible to atomically switch all server configurations at the same time, so old and new majorities must overlap. In order to avoid availability gaps in case a node has an empty log, new nodes join the cluster as non-voting members. Once they are caught up with the leader, they can be considered for majorities.
A leader that is not part of the new majority still manages the cluster until the new configuration is committed, and then steps down. Requests for vote by removed nodes will simply be ignored when a new leader exists and keeps up with heartbeats.
KRaft protocol
With KIP-500, Kafka introduces a new controller
node role configured via process.roles
. A controller pool forms a quorum that elects a leader (active controller) and replicates cluster metadata using KRaft, a pull-based Raft implementation. Controller nodes use dedicated listeners specified in controller.listener.names
and discover each other via controller.quorum.bootstrap.servers
.
bin/kafka-metadata-quorum.sh --bootstrap-controller :6000 describe --re --hu
NodeId DirectoryId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status
1 AAjMTKjgddtAwwCW0HYikA 549 0 4 ms ago 4 ms ago Leader
0 IddzzNvHTq2zTsHNd15alg 549 0 158 ms ago 158 ms ago Follower
2 z1Uo4M6C7Ddhovt2AdNSRQ 549 0 157 ms ago 157 ms ago Follower
8 hDAtQmlTBiMBNP5p3LdnJg 549 0 158 ms ago 158 ms ago Observer
7 sRAWitiwvTEWEHKc1D8dzg 549 0 157 ms ago 157 ms ago Observer
6 l4FPIRGKtFkwwJVdZW-NdQ 549 0 157 ms ago 157 ms ago Observer
This architectural shift addresses several critical limitations:
- Operational complexity: It eliminates the need to manage an external ZooKeeper cluster, creating a self-managed system where Kafka handles its own metadata.
- Scalability issues: It removes ZooKeeper as a potential bottleneck.
- State inconsistencies: It resolves frequent inconsistencies between controller's in-memory state and ZooKeeper's stored state.
The new architecture supports combined mode, where nodes run both controller and broker components in one JVM process, enabling single-node clusters. This is useful for testing but not recommended for production.
The following are some important metrics to track:
kafka.controller:type:KafkaController,name=ActiveControllerCount
: The number of active controllers.kafka.controller:type:KafkaController,name=ActiveBrokerCount
: The number of active brokers.kafka.server:type=raft-metrics,name=current-leader
: The current quorum leader's ID;-1
indicates unknown.kafka.server:type=raft-metrics,name=current-epoch
: The current quorum epoch.kafka.server:type=raft-metrics,name=election-latency-avg
: The average time in milliseconds spent on electing a new leader.kafka.server:type=raft-metrics,name=append-records-rate
: The average number of records appended per second as the leader of the Raft quorum.kafka.server:type=raft-metrics,id=__cluster_metadata,name=commit-latency-avg
: The average time in milliseconds to commit an entry in the Raft log.
Metadata management
To ensure metadata update ordering, KRaft uses the __cluster_metadata
internal topic with a single partition and flushes to disk before follower replication to prevent data loss. Periodic snapshots enable faster synchronization and prevent disk space exhaustion. Voting state is stored in the quorum-state
file within the cluster metadata partition folder.
cat build/test/server2/metadata/__cluster_metadata-0/quorum-state
{"leaderId":1,"leaderEpoch":2,"votedId":1,"votedDirectoryId":"AAjMTKjgddtAwwCW0HYikA","data_version":1}
The metadata.version
(MV) is a required cluster-wide configuration that determines the metadata schema for log operations and the enabled features. Each Kafka feature gate has minimum MV requirements and supported version ranges. The finalized version shows the actual feature version in use. For example, the dynamic quorum feature requires kraft.version
1, which needs MV 21 (3.9-IV0) or higher.
bin/kafka-features.sh --bootstrap-controller :9093 describe | grep kraft.version
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 9
$ bin/kafka-features.sh --bootstrap-server :9092 feature-dependencies --feature kraft.version=1
kraft.version=1 requires:
metadata.version=21 (3.9-IV0)
The active controller processes client metadata update requests, appends metadata changes to the log, and coordinates cluster operations. During bootstrapping and scaling, it stores the voter set by appending VotersRecord
control record to the metadata log.
bin/kafka-dump-log.sh --cluster-metadata-decoder \
--files build/test/server0/metadata/__cluster_metadata-0/*.log \
| awk '!/NO_OP_RECORD/ && !/baseOffset:/'
Dumping /build/test/server0/metadata/__cluster_metadata-0/00000000000000000000.log
Log starting offset: 0
. . .
| offset: 27 CreateTime: 1755853393159 keySize: 4 valueSize: 157 sequence: -1 headerKeys: [] KRaftVoters {"version":0,"voters":[{"voterId":0,"voterDirectoryId":"f2OxqjUwQbGEBzI8EfO0-A","endpoints":[{"name":"CONTROLLER","host":"localhost","port":6000}],"kRaftVersionFeature":{"minSupportedVersion":0,"maxSupportedVersion":1}},{"voterId":1,"voterDirectoryId":"d1hwpae6nHdZSg8KvxOUqw","endpoints":[{"name":"CONTROLLER","host":"localhost","port":6001}],"kRaftVersionFeature":{"minSupportedVersion":0,"maxSupportedVersion":1}},{"voterId":2,"voterDirectoryId":"fjqjsIl_uktdk2vpjAF70Q","endpoints":[{"name":"CONTROLLER","host":"localhost","port":6002}],"kRaftVersionFeature":{"minSupportedVersion":0,"maxSupportedVersion":1}}]}
Brokers are observers that replicate the metadata partition to learn about topics, partition assignments, and configuration changes, but aren't part of the quorum. They send BrokerRegistration
RPCs to register with the active controller and communicate supported features. They also send periodic BrokerHeartbeat
RPCs to avoid being fenced.
Core RPCs
The Vote
RPC is sent by candidate controllers to other controllers when no active controller exists or has failed. Once elected, the active controller immediately writes the LeaderChangeMessage
control record to commit any uncommitted records from the previous epoch and prevent indefinite delays when no client metadata changes occur. All KRaft RPC schemas include clusterId
and currentLeaderEpoch
fields for identity verification and fencing logic.
# filtering out no-op metadata records (see KIP-835)
$ bin/kafka-dump-log.sh --cluster-metadata-decoder \
--files build/test/server0/metadata/__cluster_metadata-0/*.log \
| awk '!/NO_OP_RECORD/ && !/baseOffset:/'
Dumping build/test/server0/metadata/__cluster_metadata-0/00000000000000000000.log
Log starting offset: 0
| offset: 0 CreateTime: 1755853386351 keySize: 4 valueSize: 19 sequence: -1 headerKeys: [] LeaderChange: {"version":0,"leaderId":0,"voters":[{"voterId":0}],"grantingVoters":[{"voterId":0}]}
...
The Fetch
RPC is periodically sent by followers and observers to the leader for metadata replication. The pull-based approach scales better as followers and observers control their fetch rate and the leader avoids managing outbound connections to all of them.
Two fetch rounds are needed for followers to commit metadata changes: first to fetch new records, and a second to get the updated high watermark (HW). The Fetch
request contains the target offset and the epoch of the last local metadata log offset. The active controller validates this against its metadata log using the leader-epoch-checkpoint
file, which is cached in memory for efficiency.
When inconsistencies are detected, the Fetch
response contains the DivergingEpoch
tagged field that is used for local log truncation. Multiple fetch rounds might be required in worst-case scenarios. When the offset is less than the metadata log start offset (LSO), the Fetch
response includes the SnapshotId
tagged field to be used with FetchSnapshot
RPCs (snapshots are fetched in chunks).
Dynamic quorum
When handling runtime voter changes with AddRaftVoter
, RemoveRaftVoter
, and UpdateRaftVoter
RPCs, the active controller enforces one controller addition or removal at a time. This prevents split-brain scenarios, as asynchronous metadata log replication with multiple in-flight quorum changes could create disjoint majorities. If the active controller is being removed, it continues managing other controllers until the voters control record commits or the epoch advances.
Implementation details
In addition to the legacy core module, which still has some bridge classes (RaftManager
, KafkaMetadataLog
, and DefaultExternalKRaftMetrics
), there are two main modules that contain the KRaft implementation:
raft
: Consensus protocol as defined by KIP-595 and KIP-853.metadata
: Controller logic as defined by KIP-631 and KIP-630.
The metadata pipeline has three layers:
QuorumController
: Metadata creation and in-memory state machine update.KafkaRaftClient
: Leader election and metadata replication.MetadataLoader
: Metadata distribution to internal components.
The KafkaRequestHandler thread uses ControllerApis
to deserialize binary requests into specific request objects. After validation, it calls QuorumController
, which places a ControllerWriteEvent
onto the KafkaEventQueue
—a single-threaded event loop that ensures ordered controller operations without concurrency issues. This queue uses a circular doubly-linked list for O(1)
insertions, deletions, and head/tail access.
The QuorumController
manages the in-memory state machine and uses the DeferredEventQueue
(purgatory) to park RPCs waiting for KafkaRaftClient
to complete its job. The KafkaRaftClient
is driven by a single-threaded event loop that polls the RaftMessageQueue
for sequential processing of leader election and log replication RPCs. The MetadataLoader
is a broker component that packages metadata changes into deltas or images for consumption by publishers that distribute to other internal components. For example, the DynamicConfigPublisher
updates broker and topic configurations.
Both QuorumController
and MetadataLoader
implement RaftClient.Listener
interface and register with the KafkaRaftClient
, which serves as the bridge between the Raft consensus layer and the controller's state machine. This mechanism allows them to receive callbacks for processing committed metadata records (handleCommit
), loading and applying snapshot data (handleLoadSnapshot
), reacting to leadership transitions (handleLeaderChange
), and running a graceful shutdown (beginShutdown
).
As an example, let's see how consensus is reached when handling a CreateTopics
request. For simplicity, I'm skipping the metadata loading and distribution phases. Figure 2 illustrates the simplified flow.

Event creation and queuing:
// in QuorumController.appendWriteEvent() ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, flags); queue.append(event); // KafkaEventQueue return event.future(); // CompletableFuture<Response>
Sequential processing:
// KafkaEventQueue.EventHandler thread executes event.run() { // a. generate metadata records (TopicRecord, PartitionRecord) ControllerResult<T> result = op.generateRecordsAndResult(); // b. apply to local state replay(message.message(), recordOffset); // c. replicate via Raft long offset = raftClient.prepareAppend(records); raftClient.schedulePreparedAppend(); // d. add to deferred queue deferredEventQueue.add(offset, this); }
- Raft replication:
KafkaRaftClient.prepareAppend()stages
records inBatchAccumulator
.KafkaRaftClient.schedulePreparedAppend()
allows the accumulator to drain.- The actual metadata records replication happens through the polling mechanism:
- The leader writes to its log first.
- Followers poll the leader via
FetchRequests
. - The leader responds with
FetchResponses
containing the new records. - Followers append to their logs.
- The leader tracks follower progress.
- When the majority has replicated, the high watermark advances.
Completion:
// QuorumMetaLogListener.handleCommit() called by KafkaRaftClient offsetControl.handleCommitBatch(batch); deferredEventQueue.completeUpTo(offsetControl.lastStableOffset()); // in DeferredEventQueue event.complete(null); // calls ControllerWriteEvent.complete() // in ControllerWriteEvent.complete() future.complete(resultAndOffset.response()); // response sent to client
Summary
Apache Kafka's transition to the KRaft protocol represents a significant architectural evolution that fundamentally transforms how Kafka manages its metadata and consensus operations. By eliminating the dependency on ZooKeeper and implementing a self-managed consensus system based on Raft, Kafka has addressed longstanding operational complexities, scalability bottlenecks, and state consistency issues that affected earlier versions.
The KRaft implementation demonstrates sophisticated engineering through its three-layer architecture, single-threaded event processing, and pull-based replication model, all while maintaining the safety guarantees essential for distributed systems. The protocol's design reflects careful consideration of real-world distributed system challenges, from handling network partitions and split-brain scenarios to enabling dynamic quorum management and efficient log compaction.
The comprehensive RPC framework, metadata versioning system, and robust implementation pipeline showcase how theoretical consensus algorithms can be adapted for production-scale systems. KRaft not only simplifies Kafka's operational model but also positions it as a more resilient and self-contained platform for modern distributed computing environments.