Featured image for Kafka topics.

Kubernetes and Apache Kafka are becoming de-facto platforms for developing and deploying microservice architectures. 

Kafka is a distributed data streaming platform that enables applications to publish, subscribe to, store, and process streams of messages (or events) in real-time.

These messages are stored inside Kafka topics. A topic is a set of messages you aim to organize for consumption. Each topic is stored on disk, replicated across brokers to ensure fault tolerance, and messages within the topic are retained for a designated time. Figure 1 shows an overview of a Kafka cluster.

Diagram shows a Kafka broker with three topics, along with producers and consumers.
Figure 1: Shows a Kafka broker with three topics, where different applications produce and consume messages from topics.

We can consider topics as a logical grouping for messages. A Kafka topic, however, is further divided into one or more partitions, where the produced messages are stored.

Fresh messages are appended to one of the topic partitions, and Kafka can only provide ordering guarantees to consumers for messages consumed from a single partition. Employing multiple partitions enhances performance in high data volume, data distribution, and redundancy scenarios.

 In essence, a topic combines all the events stored within its partitions.

Figure 2 shows in more detail a topic with its partitions.

Diagram showing a topic composed of 3 partitions.
Figure 2: A topic composed of 3 partitions.

Consumers and producers

Let’s explore the two elements not covered yet in these diagrams: the producers and the consumers.

Producers publish messages to a topic, where each message is added to the tail of a partition. By default, when a message includes a key, the hashed key value determines the partition that will receive the message. Without a key, the sticky partitioner is employed to distribute messages evenly among all available partitions for storage.

You can also use custom logic to route messages to specific partitions using a custom partitioning strategy.

Figure 3 summarizes the Producer interaction with partitions.

Diagram shows a producer publishing an event into one of the Topic A partitions.
Figure 3: A producer publishes an event into one of the partitions depending on the partitioning strategy.

Each message published on a topic is delivered to a consumer that has subscribed to that topic.

In most cases, a consumer advances its offset linearly, but it could start at any offset and read messages in any order. The consumer will typically “commit” its offset back to the Apache Kafka cluster so the consumer can resume from where it left off, for example, in case it restarts.

Each consumer belongs to a consumer group, a list of consumer instances that ensures fault tolerance and scalable message processing. When a consumer group contains only one consumer, that consumer is responsible for processing all messages of all topic partitions. With multiple consumers in a group, each consumer receives messages from only a subset of the partitions. Thus, if you add more consumers to a consumer group than the number of partitions for a topic, the extra consumers stay idle without receiving any messages.

Figure 4 summarizes the consumer architecture.

Diagram shows two consumer groups consuming from partitions.
Figure 4: Two consumer groups consuming from partitions.

As you can see, all Kafka elements have been designed with fault tolerance and scalability. But let’s see how producers and consumers work in a Kubernetes cluster so we can easily scale the consumers and verify how their consuming behavior changes.

Developer Sandbox for Red Hat OpenShift

You can use any Kubernetes or Red Hat OpenShift cluster. For this example, we will use the Developer Sandbox for Red Hat OpenShift, a no-cost shared OpenShift and Kubernetes cluster, so no local Kubernetes cluster is required.

Deploying Kafka

Log in into your Developer Sandbox account, click on the Add option, and select Import YAML, as shown in Figure 5.

The Import YAML option is shown in the Developer Sandbox UI.
Figure 5: Option to apply a YAML file from UI.

Then copy the following YAML document and push the Create button to apply the manifests (see Figure 6):

apiVersion: v1

kind: Service

metadata:

 name: kafka-no-keeper-bootstrap

spec:

 ports:

 - port: 9092

   protocol: TCP

   targetPort: 9092

 selector:

   app: kafka-no-keeper

 type: ClusterIP

---

apiVersion: apps/v1

kind: Deployment

metadata:

 name: kafka-no-keeper

spec:

 selector:

   matchLabels:

     app: kafka-no-keeper

 template:

   metadata:

     labels:

       app: kafka-no-keeper

   spec:

     containers:

     - name: kafka-no-keeper

       image: quay.io/strimzi/kafka:0.35.1-kafka-3.4.0

       command:

         - /bin/sh

         - -c

         - 'export CLUSTER_ID=$(./bin/kafka-storage.sh random-uuid) && ./bin/kafka-storage.sh format -t $CLUSTER_ID -c ./config/kraft/server.properties && ./bin/kafka-server-start.sh ./config/kraft/server.properties --override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS}'

       env:

       - name: LOG_DIR

         value: /tmp/logs

       - name: KAFKA_ADVERTISED_LISTENERS

         value: PLAINTEXT://kafka-no-keeper-bootstrap:9092

       resources:

         limits:

           memory: "1024Mi"

           cpu: "1000m"

       ports:

       - containerPort: 9092
Importing the YAML file to deploy the cluster.
Figure 6: Deploy a Kafka cluster into a Kubernetes cluster.

After a few seconds, Kafka is up and running. Repeat the same process but apply a YAML file that creates a topic named “names” with two partitions.

apiVersion: batch/v1

kind: Job

metadata:

 name: topic-create-kube

spec:

 template:

   spec:

     containers:

     - name: topic-create

       image: quay.io/strimzi/kafka:0.35.1-kafka-3.4.0

       command: ["./bin/kafka-topics.sh", "--bootstrap-server", "kafka-no-keeper-bootstrap:9092", "--create", "--if-not-exists", "--topic", "names", "--partitions", "2"]

       resources:

         limits:

           cpu: 500m

           memory: 500Mi

     restartPolicy: Never

Now, with Kafka running and a topic created, let’s deploy a producer.

Deploying the producer

This producer is a simple Quarkus application that emits some names to the names topic created earlier:

@Channel("names")

Emitter<Record<String, String>> emitter;

@GET

public void generate() {

    NAMES.stream()

    .map(n -> Record.of(Integer.toString(n.hashCode()), n))

    .forEach(record -> emitter.send(record));

}

So open the Import YAML option in the Developer Sandbox again and apply the following YAML file to deploy the producer:

---

apiVersion: v1

kind: Service

metadata:

 annotations:

   app.quarkus.io/commit-id: 7a259635c304c8b8d5e1caf76ea80bd4139af6fe

   app.quarkus.io/build-timestamp: 2023-12-01 - 13:33:45 +0000

 labels:

   app.kubernetes.io/name: kafka-producer

   app.kubernetes.io/version: producer

   app.kubernetes.io/managed-by: quarkus

 name: kafka-producer

spec:

 ports:

   - name: http

     port: 80

     protocol: TCP

     targetPort: 8080

 selector:

   app.kubernetes.io/name: kafka-producer

   app.kubernetes.io/version: producer

 type: ClusterIP

---

apiVersion: apps/v1

kind: Deployment

metadata:

 annotations:

   app.quarkus.io/commit-id: 7a259635c304c8b8d5e1caf76ea80bd4139af6fe

   app.quarkus.io/build-timestamp: 2023-12-01 - 13:33:45 +0000

 labels:

   app.kubernetes.io/managed-by: quarkus

   app.kubernetes.io/name: kafka-producer

   app.kubernetes.io/version: producer

 name: kafka-producer

spec:

 replicas: 1

 selector:

   matchLabels:

     app.kubernetes.io/name: kafka-producer

     app.kubernetes.io/version: producer

 template:

   metadata:

     annotations:

       app.quarkus.io/commit-id: 7a259635c304c8b8d5e1caf76ea80bd4139af6fe

       app.quarkus.io/build-timestamp: 2023-12-01 - 13:33:45 +0000

     labels:

       app.kubernetes.io/managed-by: quarkus

       app.kubernetes.io/name: kafka-producer

       app.kubernetes.io/version: producer

   spec:

     containers:

       - env:

           - name: KUBERNETES_NAMESPACE

             valueFrom:

               fieldRef:

                 fieldPath: metadata.namespace

         image: quay.io/lordofthejars/kafka-producer:producer

         imagePullPolicy: Always

         name: kafka-producer

         ports:

           - containerPort: 8080

             name: http

             protocol: TCP

Producer is deployed but will only generate content once we query the/produces endpoint.

Deploying the consumer

But before producing some content, let’s deploy a single consumer.

This consumer prints the topic's content into the terminal with the partition where the message is stored.

@Incoming("names")

public CompletionStage<Void> consume(Message<String> message) {

    Optional<IncomingKafkaRecordMetadata> x = message.getMetadata(IncomingKafkaRecordMetadata.class);

    x.ifPresent(m -> System.out.println("Partition: " + m.getPartition()));

    System.out.println("Message: " + message.getPayload());

    return message.ack();

}

So open the Import YAML option in Sandbox again and apply the following YAML file to deploy the consumer:

---

apiVersion: v1

kind: Service

metadata:

 annotations:

   app.quarkus.io/build-timestamp: 2023-11-07 - 12:03:16 +0000

 labels:

   app.kubernetes.io/name: kafka-consumer

   app.kubernetes.io/version: 1.0.0-SNAPSHOT

   app.kubernetes.io/managed-by: quarkus

 name: kafka-consumer

spec:

 ports:

   - name: http

     port: 80

     protocol: TCP

     targetPort: 8080

 selector:

   app.kubernetes.io/name: kafka-consumer

   app.kubernetes.io/version: 1.0.0-SNAPSHOT

 type: ClusterIP

---

apiVersion: apps/v1

kind: Deployment

metadata:

 annotations:

   app.quarkus.io/build-timestamp: 2023-11-07 - 12:03:16 +0000

 labels:

   app.kubernetes.io/managed-by: quarkus

   app.kubernetes.io/version: 1.0.0-SNAPSHOT

   app.kubernetes.io/name: kafka-consumer

 name: kafka-consumer

spec:

 replicas: 1

 selector:

   matchLabels:

     app.kubernetes.io/version: 1.0.0-SNAPSHOT

     app.kubernetes.io/name: kafka-consumer

 template:

   metadata:

     annotations:

       app.quarkus.io/build-timestamp: 2023-11-07 - 12:03:16 +0000

     labels:

       app.kubernetes.io/managed-by: quarkus

       app.kubernetes.io/version: 1.0.0-SNAPSHOT

       app.kubernetes.io/name: kafka-consumer

   spec:

     containers:

       - env:

           - name: KUBERNETES_NAMESPACE

             valueFrom:

               fieldRef:

                 fieldPath: metadata.namespace

         image: quay.io/lordofthejars/kafka-consumer:1.0.0-SNAPSHOT

         imagePullPolicy: Always

         name: kafka-consumer

         ports:

           - containerPort: 8080

             name: http

             protocol: TCP

If you go to the Topology option (Figure 7), you should see all the application elements, and the producer has a special icon at the top-right position. This icon is the public route URL to access the service outside the cluster.

The Topology view in the Developer Sandbox UI.
Figure 7: Topology view.

Generating traffic

Now, let’s send a request to the producer so it generates some content for Kafka. Open the web terminal by clicking the >_ icon in the upper-right, as shown in Figure 8.

The icon that starts the web terminal, with the open terminal at the bottom of the screen.
Figure 7: The button to start the web terminal, and the terminal opened.

Inside the web terminal window, run the following command:

​curl kafka-producer:80/produces -v

This will emit some content to the names topic.

Finally, inspect the consumer logs to validate that content has been consumed from both partitions. Run the following commands in the web terminal:

kubectl get pods

Then use the kafka-consumer pod name to inspect the logs:

NAME                                         READY   STATUS      RESTARTS   AGE

kafka-consumer-d4df74879-8c684               1/1     Running     0          38m

kafka-no-keeper-6d8dd649f8-54zvf             1/1     Running     0          41m

kafka-producer-1-deploy                      0/1     Completed   0          38m

kafka-producer-1-j6s2z                       1/1     Running     0          38m

topic-create-kube-qhlf6                      0/1     Completed   0          40m

workspaceec72309dcae94fe6-699598bbc7-vhlm6   2/2     Running     0          13m

kubectl logs kafka-consumer-d4df74879-8c684

The logs will show that this consumer processed all events from both partitions. The reason is that there was only one consumer connected to both partitions.

Partition: 1

Message: Geuss

Partition: 0

Message: Rogler

Partition: 0

Message: Spillan

Partition: 0

Message: Hamden

Partition: 1

Message: Casucci

Partition: 1

Message: Sorter

Scaling the consumer

Let’s scale the consumer app to two pods. When doing so, each consumer app instance will be responsible for processing the events of one of the two topic partitions.

In the web terminal, run the following command:

kubectl scale deployment kafka-consumer --replicas=2

Generate again some events by executing the previous command:

​​curl kafka-producer:80/produces -v

Finally, list the pods and inspect their logs. 

kubectl get pods

NAME                                         READY   STATUS      RESTARTS   AGE

kafka-consumer-d4df74879-8c684               1/1     Running     0          116m

kafka-consumer-d4df74879-cwtlr               1/1     Running     0          60s

kafka-no-keeper-6d8dd649f8-54zvf             1/1     Running     0          119m

kafka-producer-1-deploy                      0/1     Completed   0          117m

kafka-producer-1-j6s2z                       1/1     Running     0          117m

topic-create-kube-qhlf6                      0/1     Completed   0          119m

workspaceec72309dcae94fe6-699598bbc7-ggm2b   2/2     Running     0          9m40s

You’ll notice that each Pod only consumes events from one partition:

kubectl logs kafka-consumer-d4df74879-8c684

Message: Krivenko

Partition: 0

Message: Zabkar

Partition: 0

Message: Pace

Partition: 0

Message: Rogler

Partition: 0

Message: Spillan

Partition: 0

Message: Hamden

Kubectl logs kafka-consumer-d4df74879-cwtlr




Message: Mofield

Partition: 1

Message: Sefchick

Partition: 1

Message: Werling

Partition: 1

Message: Geuss

Partition: 1

Message: Casucci

Partition: 1

Message: Sorter

You can experiment with scaling to three consumers and generate traffic to validate that the third consumer becomes idle because the number of consumers exceeds the topic partitions.

The source code of the article is located at: https://github.com/lordofthejars/kafka-consumer-producer-rhsandbox

Conclusion

In this article, you’ve seen two of the most important features of Apache Kafka: scalability and fault tolerance. Consumers can be scaled to process more events in parallel, limited to the number of topic partitions. If any of these consumers go down, the remaining consumers take over the workload automatically.

Thanks to Kubernetes, scaling consumers is as easy as scaling a Pod, so Kubernetes and Kafka make a perfect match.