Kubernetes and Apache Kafka are becoming de-facto platforms for developing and deploying microservice architectures. One of the challenges you might face when using both technologies is deploying and managing Kafka brokers inside Kubernetes—dealing with YAMLs, management, container creation, etc.
The answer is Strimzi, an open source tool that lets you easily run Kafka clusters on Kubernetes in various deployment configurations. This article demonstrates how you can use Strimzi to configure Kafka Connect, a data integration framework for Kafka.
Install the Strimzi Kubernetes Operator
To start using Strimzi, install the Strimzi Kubernetes Operator to your cluster. You will need cluster-admin rights.
If you are using Red Hat OpenShift, the simplest way is to use the OperatorHub interface and search for AMQ Streams (Strimzi for OpenShift), as shown in Figure 1.
Select the operator and push the Create and Install buttons (Figure 2).
After the operator is installed, create a new OpenShift project named kafka
to deploy the Kafka brokers (Figure 3).
Install Strimzi using YAML
Alternatively, you can install the Strimzi operator using a YAML file. This is necessary when using another Kubernetes distribution like minikube. Run the following commands:
kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
Provision the Kafka cluster
With the operator up and running, provision the Kafka cluster by applying a Kubernetes custom resource of kind Kafka.
In this example, we will deploy a Kafka cluster with a single node and ephemeral storage.
Create a new file named kafka.yaml
with the following content:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.2.3
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.2"
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
And apply the manifest:
kubectl apply -f kafka.yaml -n kafka
After a few minutes, Kafka is up and running in the Kubernetes cluster. One of the essential resources created, apart from the StatefulSet elements, is a Kubernetes service with the name my-cluster
(from the metadata name field set in the previous manifest). You need to use this hostname within the cluster to access Kafka.
Create a new topic
With the Kafka cluster ready, it’s time to create a new topic named samples
to publish events to.
Create a topic.yaml
file with the following content:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: samples
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
And apply it by running the kubectl
command:
kubectl apply -f topic.yaml -n kafka
With that in place, you can start producing and consuming events. But let’s now focus on configuring Kafka Connect using Strimzi.
Kafka Connect configuration
Kafka Connect is an integration toolkit for streaming data between Kafka brokers and other systems.
Let’s assume the following scenario: A producer is generating events to the samples
topic. Each event has a JSON document payload with an id
and a message
.
An example of the payload is shown in the snippet:
{
"message" : "Hi Duke",
"id" : "e2200c82-f1a0-4eb0-9e3c-74de800b5991"
}
We want to store these events in a MongoDB database, as we need to consume them from a legacy application.
Moreover, to make things more interesting, the documents in MongoDB should get stored with field welcome
instead of message
, so you should rename the field before storing it inside the database.
One option could be to develop a consumer and producer receiving all events, transforming them, and storing them in the MongoDB instance. A better way is to use Kafka Connect with a MongoDB connector to automatically consume events and sink them to MongoDB without writing any code.
The diagram in Figure 4 summarizes the architecture using Kafka Connect.
samples
Kafka topic. The MongoDB Kafka Connect Sink task will consume each message, make the adjustments and store it in the MongoDB collection.Deploy the MongoDB instance with Helm
Before deploying and configuring the Kafka Connect sink task, let’s deploy the MongoDB instance in the Kubernetes cluster using Helm.
In a terminal window, run the following command to register the bitnami repo and install the MongoDB Helm chart:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install mongodb bitnami/mongodb --set podSecurityContext.fsGroup="",containerSecurityContext.enabled=false,podSecurityContext.enabled=false,auth.enabled=false --version 13.6.0 -n kafka
To deploy the producer, create a simple Deployment file with the following content and apply it to the Kubernetes cluster:
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
app.quarkus.io/build-timestamp: 2022-12-16 - 13:24:37 +0000
labels:
app.kubernetes.io/version: 1.0.0-SNAPSHOT
app.kubernetes.io/name: kafka-spam-messages
name: kafka-spam-messages
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/version: 1.0.0-SNAPSHOT
app.kubernetes.io/name: kafka-spam-messages
template:
metadata:
annotations:
app.quarkus.io/build-timestamp: 2022-12-16 - 13:24:37 +0000
labels:
app.kubernetes.io/version: 1.0.0-SNAPSHOT
app.kubernetes.io/name: kafka-spam-messages
spec:
containers:
- env:
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: quay.io/lordofthejars/kafka-spam-messages:1.0.0-SNAPSHOT
imagePullPolicy: Always
name: kafka-spam-messages
And run:
kubectl apply -f producer.yaml -n kafka
At this point, the producer is generating periodic events to the samples
topic, but they are not transferred to MongoDB as the Kafka Connect sink task has yet to be deployed.
Create and deploy a Kafka Connect container
Apart from deploying a Kafka cluster, Strimzi also lets you create and deploy a Kafka Connect container.
Let’s create a new Strimzi CRD file of kind KafkaConnect
which will build a container image containing the MongoDB Kafka Connector Jar file downloaded from Maven coordinates. This is configured in the build section of the CR.
Because the container image needs to be published into a protected container registry, you need to set a Kubernetes Secret name with the credentials to the container registry so you can push the image in the pushSecret field.
Let’s start by creating a secret named quayio
with the container registry credentials from a Docker configuration file (you can run docker login
command to generate this file).
In a terminal window, run the following command:
kubectl create secret generic quayio --from-file=.dockerconfigjson=/Users/asotobu/.docker/config.json --type=kubernetes.io/dockerconfigjson -n kafka
The KafkaConnect
file uses the use-connector-resource
annotation to let Strimzi configure the connector using the KafkaConnector
kind instead of the REST API.
Moreover, as mentioned before, Strimzi will build a new container image, push it, and finally deploy it. If you want to reuse an already created image, use the image field and remove the build field in the KafkaConnect custom resource to skip the build phase and deploy it directly.
Create a new file with the name mongodb-kc.yaml
with the following content:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: mongodb-sink-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.2.3
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
# image: quay.io/lordofthejars/mongodb-sink-connect-cluster:latest
build:
output:
type: docker
image: quay.io/lordofthejars/mongodb-sink-connect-cluster:latest
pushSecret: quayio
plugins:
- name: my-plugin
artifacts:
- type: maven
repository: https://repo1.maven.org/maven2
group: org.mongodb.kafka
artifact: mongo-kafka-connect
version: 1.8.1
config:
group.id: connect-cluster
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
offset.storage.topic: connect-offsets
config.storage.topic: connect-configs
status.storage.topic: connect-status
Apply the file in a terminal using kubectl apply
in the kafka namespace command and wait until the task is deployed; this might take a few minutes as it needs to create a container, push it, and finally deploy it.
Configure the MongoDB sink connector
The final step is configuring the MongoDB sink connector. The following configuration parameters are set:
- The
strimzi.io/cluster
label needs to be set to theKafkaConnect
name value so Strimzi can configure the instance correctly. - The
topics
where events are consumed. - The location of the MongoDB instance (
connection.uri
) as well as the database (database
) and collection (collection
) where events are stored. - A rename
transformer
changing the JSON payload, concretely renaming the fieldmessage
towelcome
.
Create a new file with the name mongodb-kcn.yaml
with the following content:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: connector-mongodb-sink
labels:
strimzi.io/cluster: mongodb-sink-connect-cluster
spec:
class: com.mongodb.kafka.connect.MongoSinkConnector
tasksMax: 1
config:
topics: samples
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
connection.uri: mongodb://mongodb:27017
database: sampledb
collection: samples
transforms: rename
transforms.rename.type: "org.apache.kafka.connect.transforms.ReplaceField$Value"
transforms.rename.renames: "message:welcome"
Apply the previous manifest to start populating events from the Kafka topic to the MongoDB collection without writing any code and all the power of Kafka.
Run the following command to apply the previous manifest:
kubectl apply -f mongodb-kcn.yaml -n kafka
Validate the MongoDB content
To validate that the example works correctly, let’s query the MongoDB content using the mongo
CLI tool.
Execute the following command to start a container image within the Kubernetes cluster with the mongo
CLI tool installed:
kubectl run --namespace kafka mongodb-client --rm --tty -i --restart='Never' --image docker.io/bitnami/mongodb:4.4.13-debian-10-r9 --command -- bash
When Pod is up and running, start the MongoDB shell:
mongo mongodb://mongodb:27017
Within the MongoDB shell, select the sampledb
database to query:
use sampledb
And query the samples
collection:
db.samples.find();
You’ll get the list of documents inserted by MongoDB Kafka Connect task:
{ "_id" : ObjectId("63bc2eebb2ff5965a0446336"), "welcome" : "Bonjour", "id" : "d2b67d9f-cbee-4028-8443-cd4f2117d0ce" }
{ "_id" : ObjectId("63bc2eebb2ff5965a0446337"), "welcome" : "Bonjour", "id" : "60b8b33d-3726-43df-9597-276652185e93" }
{ "_id" : ObjectId("63bc2eebb2ff5965a0446338"), "welcome" : "Good Bye Cruel World", "id" : "8432d1ea-a6bc-4bcc-9ca7-45657ebb3d89" }
The bottom line
Kafka and its ecosystem are practical tools for solving many problems these days in the (micro)services era. As we’ve seen, Kafka Connect greatly helps us integrate data systems, providing a secure, scalable, and easy way to connect systems.
But adopting them in Kubernetes is not trivial. Strimzi is a perfect choice to solve these problems and create a smooth experience when running Kafka and Kafka Connect in Kubernetes.
Explore more tutorials from Red Hat Developer for running Apache Kafka on Kubernetes.
Last updated: September 19, 2023