Skip to main content
Redhat Developers  Logo
  • Products

    Featured

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat OpenShift AI
      Red Hat OpenShift AI
    • Red Hat Enterprise Linux AI
      Linux icon inside of a brain
    • Image mode for Red Hat Enterprise Linux
      RHEL image mode
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • Red Hat Developer Hub
      Developer Hub
    • View All Red Hat Products
    • Linux

      • Red Hat Enterprise Linux
      • Image mode for Red Hat Enterprise Linux
      • Red Hat Universal Base Images (UBI)
    • Java runtimes & frameworks

      • JBoss Enterprise Application Platform
      • Red Hat build of OpenJDK
    • Kubernetes

      • Red Hat OpenShift
      • Microsoft Azure Red Hat OpenShift
      • Red Hat OpenShift Virtualization
      • Red Hat OpenShift Lightspeed
    • Integration & App Connectivity

      • Red Hat Build of Apache Camel
      • Red Hat Service Interconnect
      • Red Hat Connectivity Link
    • AI/ML

      • Red Hat OpenShift AI
      • Red Hat Enterprise Linux AI
    • Automation

      • Red Hat Ansible Automation Platform
      • Red Hat Ansible Lightspeed
    • Developer tools

      • Red Hat Trusted Software Supply Chain
      • Podman Desktop
      • Red Hat OpenShift Dev Spaces
    • Developer Sandbox

      Developer Sandbox
      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Secure Development & Architectures

      • Security
      • Secure coding
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
      • View All Technologies
    • Start exploring in the Developer Sandbox for free

      sandbox graphic
      Try Red Hat's products and technologies without setup or configuration.
    • Try at no cost
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • Java
      Java icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • API Catalog
    • Product Documentation
    • Legacy Documentation
    • Red Hat Learning

      Learning image
      Boost your technical skills to expert-level with the help of interactive lessons offered by various Red Hat Learning programs.
    • Explore Red Hat Learning
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

Introduction to Kafka consumers

December 14, 2023
Alex Soto Bueno
Related topics:
KafkaKubernetes
Related products:
Developer SandboxRed Hat OpenShift

Share:

    Kubernetes and Apache Kafka are becoming de-facto platforms for developing and deploying microservice architectures. 

    Kafka is a distributed data streaming platform that enables applications to publish, subscribe to, store, and process streams of messages (or events) in real-time.

    These messages are stored inside Kafka topics. A topic is a set of messages you aim to organize for consumption. Each topic is stored on disk, replicated across brokers to ensure fault tolerance, and messages within the topic are retained for a designated time. Figure 1 shows an overview of a Kafka cluster.

    Diagram shows a Kafka broker with three topics, along with producers and consumers.
    Figure 1: Shows a Kafka broker with three topics, where different applications produce and consume messages from topics.

    We can consider topics as a logical grouping for messages. A Kafka topic, however, is further divided into one or more partitions, where the produced messages are stored.

    Fresh messages are appended to one of the topic partitions, and Kafka can only provide ordering guarantees to consumers for messages consumed from a single partition. Employing multiple partitions enhances performance in high data volume, data distribution, and redundancy scenarios.

     In essence, a topic combines all the events stored within its partitions.

    Figure 2 shows in more detail a topic with its partitions.

    Diagram showing a topic composed of 3 partitions.
    Figure 2: A topic composed of 3 partitions.

    Consumers and producers

    Let’s explore the two elements not covered yet in these diagrams: the producers and the consumers.

    Producers publish messages to a topic, where each message is added to the tail of a partition. By default, when a message includes a key, the hashed key value determines the partition that will receive the message. Without a key, the sticky partitioner is employed to distribute messages evenly among all available partitions for storage.

    You can also use custom logic to route messages to specific partitions using a custom partitioning strategy.

    Figure 3 summarizes the Producer interaction with partitions.

    Diagram shows a producer publishing an event into one of the Topic A partitions.
    Figure 3: A producer publishes an event into one of the partitions depending on the partitioning strategy.

    Each message published on a topic is delivered to a consumer that has subscribed to that topic.

    In most cases, a consumer advances its offset linearly, but it could start at any offset and read messages in any order. The consumer will typically “commit” its offset back to the Apache Kafka cluster so the consumer can resume from where it left off, for example, in case it restarts.

    Each consumer belongs to a consumer group, a list of consumer instances that ensures fault tolerance and scalable message processing. When a consumer group contains only one consumer, that consumer is responsible for processing all messages of all topic partitions. With multiple consumers in a group, each consumer receives messages from only a subset of the partitions. Thus, if you add more consumers to a consumer group than the number of partitions for a topic, the extra consumers stay idle without receiving any messages.

    Figure 4 summarizes the consumer architecture.

    Diagram shows two consumer groups consuming from partitions.
    Figure 4: Two consumer groups consuming from partitions.

    As you can see, all Kafka elements have been designed with fault tolerance and scalability. But let’s see how producers and consumers work in a Kubernetes cluster so we can easily scale the consumers and verify how their consuming behavior changes.

    Developer Sandbox for Red Hat OpenShift

    You can use any Kubernetes or Red Hat OpenShift cluster. For this example, we will use the Developer Sandbox for Red Hat OpenShift, a no-cost shared OpenShift and Kubernetes cluster, so no local Kubernetes cluster is required.

    Deploying Kafka

    Log in into your Developer Sandbox account, click on the Add option, and select Import YAML, as shown in Figure 5.

    The Import YAML option is shown in the Developer Sandbox UI.
    Figure 5: Option to apply a YAML file from UI.

    Then copy the following YAML document and push the Create button to apply the manifests (see Figure 6):

    apiVersion: v1
    
    kind: Service
    
    metadata:
    
     name: kafka-no-keeper-bootstrap
    
    spec:
    
     ports:
    
     - port: 9092
    
       protocol: TCP
    
       targetPort: 9092
    
     selector:
    
       app: kafka-no-keeper
    
     type: ClusterIP
    
    ---
    
    apiVersion: apps/v1
    
    kind: Deployment
    
    metadata:
    
     name: kafka-no-keeper
    
    spec:
    
     selector:
    
       matchLabels:
    
         app: kafka-no-keeper
    
     template:
    
       metadata:
    
         labels:
    
           app: kafka-no-keeper
    
       spec:
    
         containers:
    
         - name: kafka-no-keeper
    
           image: quay.io/strimzi/kafka:0.35.1-kafka-3.4.0
    
           command:
    
             - /bin/sh
    
             - -c
    
             - 'export CLUSTER_ID=$(./bin/kafka-storage.sh random-uuid) && ./bin/kafka-storage.sh format -t $CLUSTER_ID -c ./config/kraft/server.properties && ./bin/kafka-server-start.sh ./config/kraft/server.properties --override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS}'
    
           env:
    
           - name: LOG_DIR
    
             value: /tmp/logs
    
           - name: KAFKA_ADVERTISED_LISTENERS
    
             value: PLAINTEXT://kafka-no-keeper-bootstrap:9092
    
           resources:
    
             limits:
    
               memory: "1024Mi"
    
               cpu: "1000m"
    
           ports:
    
           - containerPort: 9092
    Importing the YAML file to deploy the cluster.
    Figure 6: Deploy a Kafka cluster into a Kubernetes cluster.

    After a few seconds, Kafka is up and running. Repeat the same process but apply a YAML file that creates a topic named “names” with two partitions.

    apiVersion: batch/v1
    
    kind: Job
    
    metadata:
    
     name: topic-create-kube
    
    spec:
    
     template:
    
       spec:
    
         containers:
    
         - name: topic-create
    
           image: quay.io/strimzi/kafka:0.35.1-kafka-3.4.0
    
           command: ["./bin/kafka-topics.sh", "--bootstrap-server", "kafka-no-keeper-bootstrap:9092", "--create", "--if-not-exists", "--topic", "names", "--partitions", "2"]
    
           resources:
    
             limits:
    
               cpu: 500m
    
               memory: 500Mi
    
         restartPolicy: Never

    Now, with Kafka running and a topic created, let’s deploy a producer.

    Deploying the producer

    This producer is a simple Quarkus application that emits some names to the names topic created earlier:

    @Channel("names")
    
    Emitter<Record<String, String>> emitter;
    
    @GET
    
    public void generate() {
    
        NAMES.stream()
    
        .map(n -> Record.of(Integer.toString(n.hashCode()), n))
    
        .forEach(record -> emitter.send(record));
    
    }

    So open the Import YAML option in the Developer Sandbox again and apply the following YAML file to deploy the producer:

    ---
    
    apiVersion: v1
    
    kind: Service
    
    metadata:
    
     annotations:
    
       app.quarkus.io/commit-id: 7a259635c304c8b8d5e1caf76ea80bd4139af6fe
    
       app.quarkus.io/build-timestamp: 2023-12-01 - 13:33:45 +0000
    
     labels:
    
       app.kubernetes.io/name: kafka-producer
    
       app.kubernetes.io/version: producer
    
       app.kubernetes.io/managed-by: quarkus
    
     name: kafka-producer
    
    spec:
    
     ports:
    
       - name: http
    
         port: 80
    
         protocol: TCP
    
         targetPort: 8080
    
     selector:
    
       app.kubernetes.io/name: kafka-producer
    
       app.kubernetes.io/version: producer
    
     type: ClusterIP
    
    ---
    
    apiVersion: apps/v1
    
    kind: Deployment
    
    metadata:
    
     annotations:
    
       app.quarkus.io/commit-id: 7a259635c304c8b8d5e1caf76ea80bd4139af6fe
    
       app.quarkus.io/build-timestamp: 2023-12-01 - 13:33:45 +0000
    
     labels:
    
       app.kubernetes.io/managed-by: quarkus
    
       app.kubernetes.io/name: kafka-producer
    
       app.kubernetes.io/version: producer
    
     name: kafka-producer
    
    spec:
    
     replicas: 1
    
     selector:
    
       matchLabels:
    
         app.kubernetes.io/name: kafka-producer
    
         app.kubernetes.io/version: producer
    
     template:
    
       metadata:
    
         annotations:
    
           app.quarkus.io/commit-id: 7a259635c304c8b8d5e1caf76ea80bd4139af6fe
    
           app.quarkus.io/build-timestamp: 2023-12-01 - 13:33:45 +0000
    
         labels:
    
           app.kubernetes.io/managed-by: quarkus
    
           app.kubernetes.io/name: kafka-producer
    
           app.kubernetes.io/version: producer
    
       spec:
    
         containers:
    
           - env:
    
               - name: KUBERNETES_NAMESPACE
    
                 valueFrom:
    
                   fieldRef:
    
                     fieldPath: metadata.namespace
    
             image: quay.io/lordofthejars/kafka-producer:producer
    
             imagePullPolicy: Always
    
             name: kafka-producer
    
             ports:
    
               - containerPort: 8080
    
                 name: http
    
                 protocol: TCP

    Producer is deployed but will only generate content once we query the/produces endpoint.

    Deploying the consumer

    But before producing some content, let’s deploy a single consumer.

    This consumer prints the topic's content into the terminal with the partition where the message is stored.

    @Incoming("names")
    
    public CompletionStage<Void> consume(Message<String> message) {
    
        Optional<IncomingKafkaRecordMetadata> x = message.getMetadata(IncomingKafkaRecordMetadata.class);
    
        x.ifPresent(m -> System.out.println("Partition: " + m.getPartition()));
    
        System.out.println("Message: " + message.getPayload());
    
        return message.ack();
    
    }

    So open the Import YAML option in Sandbox again and apply the following YAML file to deploy the consumer:

    ---
    
    apiVersion: v1
    
    kind: Service
    
    metadata:
    
     annotations:
    
       app.quarkus.io/build-timestamp: 2023-11-07 - 12:03:16 +0000
    
     labels:
    
       app.kubernetes.io/name: kafka-consumer
    
       app.kubernetes.io/version: 1.0.0-SNAPSHOT
    
       app.kubernetes.io/managed-by: quarkus
    
     name: kafka-consumer
    
    spec:
    
     ports:
    
       - name: http
    
         port: 80
    
         protocol: TCP
    
         targetPort: 8080
    
     selector:
    
       app.kubernetes.io/name: kafka-consumer
    
       app.kubernetes.io/version: 1.0.0-SNAPSHOT
    
     type: ClusterIP
    
    ---
    
    apiVersion: apps/v1
    
    kind: Deployment
    
    metadata:
    
     annotations:
    
       app.quarkus.io/build-timestamp: 2023-11-07 - 12:03:16 +0000
    
     labels:
    
       app.kubernetes.io/managed-by: quarkus
    
       app.kubernetes.io/version: 1.0.0-SNAPSHOT
    
       app.kubernetes.io/name: kafka-consumer
    
     name: kafka-consumer
    
    spec:
    
     replicas: 1
    
     selector:
    
       matchLabels:
    
         app.kubernetes.io/version: 1.0.0-SNAPSHOT
    
         app.kubernetes.io/name: kafka-consumer
    
     template:
    
       metadata:
    
         annotations:
    
           app.quarkus.io/build-timestamp: 2023-11-07 - 12:03:16 +0000
    
         labels:
    
           app.kubernetes.io/managed-by: quarkus
    
           app.kubernetes.io/version: 1.0.0-SNAPSHOT
    
           app.kubernetes.io/name: kafka-consumer
    
       spec:
    
         containers:
    
           - env:
    
               - name: KUBERNETES_NAMESPACE
    
                 valueFrom:
    
                   fieldRef:
    
                     fieldPath: metadata.namespace
    
             image: quay.io/lordofthejars/kafka-consumer:1.0.0-SNAPSHOT
    
             imagePullPolicy: Always
    
             name: kafka-consumer
    
             ports:
    
               - containerPort: 8080
    
                 name: http
    
                 protocol: TCP

    If you go to the Topology option (Figure 7), you should see all the application elements, and the producer has a special icon at the top-right position. This icon is the public route URL to access the service outside the cluster.

    The Topology view in the Developer Sandbox UI.
    Figure 7: Topology view.

    Generating traffic

    Now, let’s send a request to the producer so it generates some content for Kafka. Open the web terminal by clicking the >_ icon in the upper-right, as shown in Figure 8.

    The icon that starts the web terminal, with the open terminal at the bottom of the screen.
    Figure 7: The button to start the web terminal, and the terminal opened.

    Inside the web terminal window, run the following command:

    ​curl kafka-producer:80/produces -v

    This will emit some content to the names topic.

    Finally, inspect the consumer logs to validate that content has been consumed from both partitions. Run the following commands in the web terminal:

    kubectl get pods

    Then use the kafka-consumer pod name to inspect the logs:

    NAME                                         READY   STATUS      RESTARTS   AGE
    
    kafka-consumer-d4df74879-8c684               1/1     Running     0          38m
    
    kafka-no-keeper-6d8dd649f8-54zvf             1/1     Running     0          41m
    
    kafka-producer-1-deploy                      0/1     Completed   0          38m
    
    kafka-producer-1-j6s2z                       1/1     Running     0          38m
    
    topic-create-kube-qhlf6                      0/1     Completed   0          40m
    
    workspaceec72309dcae94fe6-699598bbc7-vhlm6   2/2     Running     0          13m
    
    kubectl logs kafka-consumer-d4df74879-8c684

    The logs will show that this consumer processed all events from both partitions. The reason is that there was only one consumer connected to both partitions.

    Partition: 1
    
    Message: Geuss
    
    Partition: 0
    
    Message: Rogler
    
    Partition: 0
    
    Message: Spillan
    
    Partition: 0
    
    Message: Hamden
    
    Partition: 1
    
    Message: Casucci
    
    Partition: 1
    
    Message: Sorter

    Scaling the consumer

    Let’s scale the consumer app to two pods. When doing so, each consumer app instance will be responsible for processing the events of one of the two topic partitions.

    In the web terminal, run the following command:

    kubectl scale deployment kafka-consumer --replicas=2

    Generate again some events by executing the previous command:

    ​​curl kafka-producer:80/produces -v

    Finally, list the pods and inspect their logs. 

    kubectl get pods
    
    NAME                                         READY   STATUS      RESTARTS   AGE
    
    kafka-consumer-d4df74879-8c684               1/1     Running     0          116m
    
    kafka-consumer-d4df74879-cwtlr               1/1     Running     0          60s
    
    kafka-no-keeper-6d8dd649f8-54zvf             1/1     Running     0          119m
    
    kafka-producer-1-deploy                      0/1     Completed   0          117m
    
    kafka-producer-1-j6s2z                       1/1     Running     0          117m
    
    topic-create-kube-qhlf6                      0/1     Completed   0          119m
    
    workspaceec72309dcae94fe6-699598bbc7-ggm2b   2/2     Running     0          9m40s

    You’ll notice that each Pod only consumes events from one partition:

    kubectl logs kafka-consumer-d4df74879-8c684
    
    Message: Krivenko
    
    Partition: 0
    
    Message: Zabkar
    
    Partition: 0
    
    Message: Pace
    
    Partition: 0
    
    Message: Rogler
    
    Partition: 0
    
    Message: Spillan
    
    Partition: 0
    
    Message: Hamden
    
    Kubectl logs kafka-consumer-d4df74879-cwtlr
    
    
    
    
    Message: Mofield
    
    Partition: 1
    
    Message: Sefchick
    
    Partition: 1
    
    Message: Werling
    
    Partition: 1
    
    Message: Geuss
    
    Partition: 1
    
    Message: Casucci
    
    Partition: 1
    
    Message: Sorter

    You can experiment with scaling to three consumers and generate traffic to validate that the third consumer becomes idle because the number of consumers exceeds the topic partitions.

    The source code of the article is located at: https://github.com/lordofthejars/kafka-consumer-producer-rhsandbox

    Conclusion

    In this article, you’ve seen two of the most important features of Apache Kafka: scalability and fault tolerance. Consumers can be scaled to process more events in parallel, limited to the number of topic partitions. If any of these consumers go down, the remaining consumers take over the workload automatically.

    Thanks to Kubernetes, scaling consumers is as easy as scaling a Pod, so Kubernetes and Kafka make a perfect match.

    Related Posts

    • Fine-tune Kafka performance with the Kafka optimization theorem

    • Is it better to split Kafka clusters?

    • How to use Kafka Cruise Control for cluster optimization

    • A developer's guide to using Kafka with Java, Part 1

    • Deploy a Kafka Connect container using Strimzi

    • How to create Kafka consumers and producers in Java

    Recent Posts

    • More Essential AI tutorials for Node.js Developers

    • How to run a fraud detection AI model on RHEL CVMs

    • How we use software provenance at Red Hat

    • Alternatives to creating bootc images from scratch

    • How to update OpenStack Services on OpenShift

    What’s up next?

    Kafka Connect: Build and Run Data Pipelines Feature and Share image

    Read Kafka Connect, a practical guide to building data pipelines between Kafka clusters and a variety of data sources and sinks.  You will learn how to use connectors, configure the Kafka Connect framework, monitor and operate it in production, and even develop your own source and sink connectors.

    Get the e-book
    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue