Kafka Monthly Digest

Kubernetes and Apache Kafka are becoming de-facto platforms for developing and deploying microservice architectures. One of the challenges you might face when using both technologies is deploying and managing Kafka brokers inside Kubernetes—dealing with YAMLs, management, container creation, etc.

The answer is Strimzi, an open source tool that lets you easily run Kafka clusters on Kubernetes in various deployment configurations. This article demonstrates how you can use Strimzi to configure Kafka Connect, a data integration framework for Kafka.

Install the Strimzi Kubernetes Operator

To start using Strimzi, install the Strimzi Kubernetes Operator to your cluster. You will need cluster-admin rights.

If you are using Red Hat OpenShift, the simplest way is to use the OperatorHub interface and search for AMQ Streams (Strimzi for OpenShift), as shown in Figure 1.

Locating the AMQ Streams Operator by filtering results in OperatorHub.
Figure 1: Searching for AMQ Streams in OperatorHub.

Select the operator and push the Create and Install buttons (Figure 2).

Installing the AMQ Streams Operator.
Figure 2: Installing the AMQ Streams Operator.

After the operator is installed, create a new OpenShift project named kafka to deploy the Kafka brokers (Figure 3).

Creating a new project called kafka.
Figure 3: Creating a new OpenShift project to deploy the Kafka brokers.

Install Strimzi using YAML

Alternatively, you can install the Strimzi operator using a YAML file. This is necessary when using another Kubernetes distribution like minikube. Run the following commands:

kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

Provision the Kafka cluster

With the operator up and running, provision the Kafka cluster by applying a Kubernetes custom resource of kind Kafka.

In this example, we will deploy a Kafka cluster with a single node and ephemeral storage.

Create a new file named kafka.yaml with the following content:

apiVersion: kafka.strimzi.io/v1beta2

kind: Kafka

metadata:

  name: my-cluster

spec:

  kafka:

    version: 3.2.3

    replicas: 3

    listeners:

      - name: plain

        port: 9092

        type: internal

        tls: false

      - name: tls

        port: 9093

        type: internal

        tls: true

    config:

      offsets.topic.replication.factor: 1

      transaction.state.log.replication.factor: 1

      transaction.state.log.min.isr: 1

      default.replication.factor: 1

      min.insync.replicas: 1

      inter.broker.protocol.version: "3.2"

    storage:

      type: ephemeral

  zookeeper:

    replicas: 3

    storage:

      type: ephemeral

  entityOperator:

    topicOperator: {}

    userOperator: {}

And apply the manifest:

kubectl apply -f kafka.yaml -n kafka

After a few minutes, Kafka is up and running in the Kubernetes cluster. One of the essential resources created, apart from the StatefulSet elements, is a Kubernetes service with the name my-cluster (from the metadata name field set in the previous manifest). You need to use this hostname within the cluster to access Kafka.

Create a new topic

With the Kafka cluster ready, it’s time to create a new topic named samples to publish events to.

Create a topic.yaml file with the following content:

apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaTopic

metadata:

  name: samples

  labels:

    strimzi.io/cluster: my-cluster

spec:

  partitions: 1

  replicas: 1

  config:

    retention.ms: 7200000

    segment.bytes: 1073741824

And apply it by running the kubectl command:

kubectl apply -f topic.yaml -n kafka 

With that in place, you can start producing and consuming events. But let’s now focus on configuring Kafka Connect using Strimzi.

Kafka Connect configuration

Kafka Connect is an integration toolkit for streaming data between Kafka brokers and other systems. 

Let’s assume the following scenario: A producer is generating events to the samples topic. Each event has a JSON document payload with an id and a message.

An example of the payload is shown in the snippet:

{

  "message" : "Hi Duke", 

  "id" : "e2200c82-f1a0-4eb0-9e3c-74de800b5991" 

}

We want to store these events in a MongoDB database, as we need to consume them from a legacy application.

Moreover, to make things more interesting, the documents in MongoDB should get stored with field welcome instead of message, so you should rename the field before storing it inside the database.

One option could be to develop a consumer and producer receiving all events, transforming them, and storing them in the MongoDB instance. A better way is to use Kafka Connect with a MongoDB connector to automatically consume events and sink them to MongoDB without writing any code.

The diagram in Figure 4 summarizes the architecture using Kafka Connect.

Diagram of the Kafka Connect architecture.
Figure 4: The producer is a Quarkus application periodically sending messages to the samples Kafka topic. The MongoDB Kafka Connect Sink task will consume each message, make the adjustments and store it in the MongoDB collection.

Deploy the MongoDB instance with Helm

Before deploying and configuring the Kafka Connect sink task, let’s deploy the MongoDB instance in the Kubernetes cluster using Helm.

In a terminal window, run the following command to register the bitnami repo and install the MongoDB Helm chart:

helm repo add bitnami https://charts.bitnami.com/bitnami

helm install mongodb bitnami/mongodb --set podSecurityContext.fsGroup="",containerSecurityContext.enabled=false,podSecurityContext.enabled=false,auth.enabled=false --version 13.6.0 -n kafka

To deploy the producer, create a simple Deployment file with the following content and apply it to the Kubernetes cluster:

---

apiVersion: apps/v1

kind: Deployment

metadata:

  annotations:

    app.quarkus.io/build-timestamp: 2022-12-16 - 13:24:37 +0000

  labels:

    app.kubernetes.io/version: 1.0.0-SNAPSHOT

    app.kubernetes.io/name: kafka-spam-messages

  name: kafka-spam-messages

spec:

  replicas: 1

  selector:

    matchLabels:

      app.kubernetes.io/version: 1.0.0-SNAPSHOT

      app.kubernetes.io/name: kafka-spam-messages

  template:

    metadata:

      annotations:

        app.quarkus.io/build-timestamp: 2022-12-16 - 13:24:37 +0000

      labels:

        app.kubernetes.io/version: 1.0.0-SNAPSHOT

        app.kubernetes.io/name: kafka-spam-messages

    spec:

      containers:

        - env:

            - name: KUBERNETES_NAMESPACE

              valueFrom:

                fieldRef:

                  fieldPath: metadata.namespace

          image: quay.io/lordofthejars/kafka-spam-messages:1.0.0-SNAPSHOT

          imagePullPolicy: Always

          name: kafka-spam-messages

And run:

kubectl apply -f producer.yaml -n kafka

At this point, the producer is generating periodic events to the samples topic, but they are not transferred to MongoDB as the Kafka Connect sink task has yet to be deployed.

Create and deploy a Kafka Connect container

Apart from deploying a Kafka cluster, Strimzi also lets you create and deploy a Kafka Connect container.

Let’s create a new Strimzi CRD file of kind KafkaConnect which will build a container image containing the MongoDB Kafka Connector Jar file downloaded from Maven coordinates. This is configured in the build section of the CR.

Because the container image needs to be published into a protected container registry, you need to set a Kubernetes Secret name with the credentials to the container registry so you can push the image in the pushSecret field.

Let’s start by creating a secret named quayio with the container registry credentials from a Docker configuration file (you can run docker login command to generate this file).

In a terminal window, run the following command:

kubectl create secret generic quayio --from-file=.dockerconfigjson=/Users/asotobu/.docker/config.json --type=kubernetes.io/dockerconfigjson -n kafka

The KafkaConnect file uses the use-connector-resource annotation to let Strimzi configure the connector using the KafkaConnector kind instead of the REST API.

Moreover, as mentioned before, Strimzi will build a new container image, push it, and finally deploy it. If you want to reuse an already created image, use the image field and remove the build field in the KafkaConnect custom resource to skip the build phase and deploy it directly.

Create a new file with the name mongodb-kc.yamlwith the following content:

apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaConnect

metadata:

  name: mongodb-sink-connect-cluster

  annotations:

    strimzi.io/use-connector-resources: "true"

spec:

  version: 3.2.3

  replicas: 1

  bootstrapServers: my-cluster-kafka-bootstrap:9092

  # image: quay.io/lordofthejars/mongodb-sink-connect-cluster:latest 

  build:

    output:

      type: docker

      image: quay.io/lordofthejars/mongodb-sink-connect-cluster:latest

      pushSecret: quayio 

    plugins:

      - name: my-plugin

        artifacts:

          - type: maven

            repository: https://repo1.maven.org/maven2 

            group: org.mongodb.kafka

            artifact: mongo-kafka-connect

            version: 1.8.1

  config:

    group.id: connect-cluster

    key.converter: org.apache.kafka.connect.json.JsonConverter

    value.converter: org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable: false

    value.converter.schemas.enable: false

    offset.storage.topic: connect-offsets

    config.storage.topic: connect-configs

    status.storage.topic: connect-status

Apply the file in a terminal using kubectl apply in the kafka namespace command and wait until the task is deployed; this might take a few minutes as it needs to create a container, push it, and finally deploy it.

Configure the MongoDB sink connector

The final step is configuring the MongoDB sink connector. The following configuration parameters are set:

  • The strimzi.io/cluster label needs to be set to the KafkaConnect name value so Strimzi can configure the instance correctly.
  • The topics where events are consumed.
  • The location of the MongoDB instance (connection.uri) as well as the database (database) and collection (collection) where events are stored.
  • A rename transformer changing the JSON payload, concretely renaming the field message to welcome.

Create a new file with the name mongodb-kcn.yaml with the following content:

apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaConnector

metadata:

  name: connector-mongodb-sink

  labels:

    strimzi.io/cluster: mongodb-sink-connect-cluster

spec:

  class: com.mongodb.kafka.connect.MongoSinkConnector

  tasksMax: 1

  config:

    topics: samples

    key.converter: org.apache.kafka.connect.storage.StringConverter 

    value.converter: org.apache.kafka.connect.json.JsonConverter

    value.converter.schemas.enable: false

    connection.uri: mongodb://mongodb:27017

    database: sampledb

    collection: samples

    transforms: rename

    transforms.rename.type: "org.apache.kafka.connect.transforms.ReplaceField$Value"

    transforms.rename.renames: "message:welcome"

Apply the previous manifest to start populating events from the Kafka topic to the MongoDB collection without writing any code and all the power of Kafka.

Run the following command to apply the previous manifest:

kubectl apply -f mongodb-kcn.yaml -n kafka

Validate the MongoDB content

To validate that the example works correctly, let’s query the MongoDB content using the mongo CLI tool.

Execute the following command to start a container image within the Kubernetes cluster with the mongo CLI tool installed:

kubectl run --namespace kafka mongodb-client --rm --tty -i --restart='Never' --image docker.io/bitnami/mongodb:4.4.13-debian-10-r9 --command -- bash

When Pod is up and running, start the MongoDB shell: 

mongo mongodb://mongodb:27017

Within the MongoDB shell, select the sampledb database to query:

use sampledb

And query the samples collection:

db.samples.find();

You’ll get the list of documents inserted by MongoDB Kafka Connect task:

{ "_id" : ObjectId("63bc2eebb2ff5965a0446336"), "welcome" : "Bonjour", "id" : "d2b67d9f-cbee-4028-8443-cd4f2117d0ce" }

{ "_id" : ObjectId("63bc2eebb2ff5965a0446337"), "welcome" : "Bonjour", "id" : "60b8b33d-3726-43df-9597-276652185e93" }

{ "_id" : ObjectId("63bc2eebb2ff5965a0446338"), "welcome" : "Good Bye Cruel World", "id" : "8432d1ea-a6bc-4bcc-9ca7-45657ebb3d89" }

The bottom line

Kafka and its ecosystem are practical tools for solving many problems these days in the (micro)services era. As we’ve seen, Kafka Connect greatly helps us integrate data systems, providing a secure, scalable, and easy way to connect systems.

But adopting them in Kubernetes is not trivial. Strimzi is a perfect choice to solve these problems and create a smooth experience when running Kafka and Kafka Connect in Kubernetes.

Explore more tutorials from Red Hat Developer for running Apache Kafka on Kubernetes.

Last updated: September 19, 2023