Kafka Connect is an integration framework that is part of the Apache Kafka project. On Kubernetes and Red Hat OpenShift, you can deploy Kafka Connect using the Strimzi and Red Hat AMQ Streams Operators. Kafka Connect lets users run sink and source connectors. Source connectors are used to load data from an external system into Kafka. Sink connectors work the other way around and let you load data from Kafka into another external system. In most cases, the connectors need to authenticate when connecting to the other systems, so you will need to provide credentials as part of the connector's configuration. This article shows you how you can use Kubernetes secrets to store the credentials and then use them in the connector's configuration.
In this article, I will use an S3 source connector, which is one of the Apache Camel Kafka connectors. To learn more about Apache Camel Kafka connectors, you can start with this blog post. This connector is used just as an example of how to configure a connector to access a secret. You can use this same procedure with any connector, as there is nothing special required from the connector itself. We will use the S3 connector to connect to Amazon AWS S3 storage and load files from an S3 bucket into an Apache Kafka topic. In order to connect to S3 storage, we will need to specify the AWS credentials: the access key and the secret key. So, let’s start by preparing the secret with the credentials.
Creating a secret with the credentials
First, we will create a simple properties file called aws-credentials.properties
, which should look like this:
aws_access_key_id=AKIAIOSFODNN7EXAMPLE aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
The credentials you use in this properties file need to have access to the S3 bucket we will read from. Once we have the properties file with the credentials ready, we have to create the secret from this file. You can use the following command to do so:
$ kubectl create secret generic aws-credentials --from-file=./aws-credentials.properties
Building new container images with the connector
Next, we need to prepare a new Docker image with our connector. When using Strimzi, the Dockerfile
for adding the connector should look something like this:
FROM strimzi/kafka:0.16.1-kafka-2.4.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
When using AMQ Streams, it should look like this:
FROM registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER jboss:jboss
Use the Dockerfile
to build a container image with the connectors you need, and push them into your registry. If you don’t have your own private registry, you can use one of the public registries such as Quay or Docker Hub.
Deploying Apache Kafka Connect
Once we have the container image, we can finally deploy Apache Kafka Connect. You can do this by creating the following custom resource:
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnect metadata: name: my-connect-cluster spec: image: docker.io/scholzj/kafka:camel-kafka-2.4.0 replicas: 3 bootstrapServers: my-cluster-kafka-bootstrap:9092 externalConfiguration: volumes: - name: aws-credentials secret: secretName: aws-credentials config: config.providers: file config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false
Let’s look in more detail at several parts of the custom resource. First of all, notice the image
field, which tells the Operator deploying Apache Kafka Connect to use the right image with the added connectors. In my case, I pushed the container image built in previous section to Docker Hub as scholzj/kafka:camel-kafka-2.4.0
, so my configuration looks like this:
image: docker.io/scholzj/kafka:camel-kafka-2.4.0
Next, notice the externalConfiguration
section:
externalConfiguration: volumes: - name: aws-credentials secret: secretName: aws-credentials
In this section, we instruct the Operator to mount the Kubernetes secret aws-credentials
that we created at the beginning of this article into the Apache Kafka Connect pods. The secrets listed here will be mounted in the path /opt/kafka/external-configuration/<secretName>
where the <secretName>
is the name of the secret.
And finally, in the config section, we enable the FileConfigProvider
as a configuration provider in Apache Kafka Connect:
config: config.providers: file config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
Configuration providers are a way of loading configuration values from another source instead of specifying them in the configuration directly. In this case, we create the configuration provider namef file
, which will use the FileConfigProvider
class. This configuration provider is part of Apache Kafka. FileConfigProvider
can read properties files and extract values from them, and we will use it to load API keys for our Amazon AWS account.
Creating the connector using the Apache Kafka Connect REST API
Usually, we have to wait a minute or two for the Apache Kafka Connect deployment to become ready. And once it is ready, we can create the connector instance. In older versions of Strimzi and Red Hat AMQ Streams, you have to do that using the REST API. We can create the connector by posting the following JSON:
{ "name": "s3-connector", "config": { "connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector", "tasks.max": "1", "camel.source.kafka.topic": "s3-topic", "camel.source.maxPollDuration": "10000", "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter", "camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}", "camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}", "camel.component.aws-s3.configuration.region": "US_EAST_1" } }
The connector configuration contains the AWS API keys in the fields camel.component.aws-s3.configuration.access-key
and camel.component.aws-s3.configuration.secret-key
. Instead of using the values directly, we just reference the file configuration provider to load the fields aws_access_key_id
and aws_secret_access_key
from our aws-credentials.properties
file.
Notice how we reference the config provider, tell it the path to the file it should use, and include the name of the key to extract:
"camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}"
and:
"camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}"
You can POST
the results to the Apache Kafka Connect REST API, for example, using curl
:
$ curl -X POST -H "Content-Type: application/json" -d connector-config.json http://my-connect-cluster-connect-api:8083/connectors
One of the advantages of using the configuration providers is that even when you later get the connector configuration, it will still contain the configuration provider and not the values you want to keep secret:
$ curl http://my-connect-cluster-connect-api:8083/connectors/s3-connector { "name": "s3-connector", "config": { "connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector", "camel.source.maxPollDuration": "10000", "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false", "camel.component.aws-s3.configuration.region": "US_EAST_1", "camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}", "tasks.max": "1", "name": "s3-connector", "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter", "camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "camel.source.kafka.topic": "s3-topic" }, "tasks": [ { "connector": "s3-connector", "task": 0 } ], "type": "source" }
Creating the connector with the Strimzi connector Operator
When using Strimzi 0.16.0 or newer, we can also use the new connector Operator. It lets us create the connector using the following custom resource YAML (you can use the configuration provider directly in the KafkaConnector
custom resource as well):
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: s3-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.CamelSourceConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.camel.kafkaconnector.converters.S3ObjectConverter camel.source.kafka.topic: s3-topic camel.source.url: aws-s3://camel-connector-test?autocloseBody=false camel.source.maxPollDuration: 10000 camel.component.aws-s3.configuration.access-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id} camel.component.aws-s3.configuration.secret-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key} camel.component.aws-s3.configuration.region: US_EAST_1
Conclusion
The security of Kubernetes secrets has its limitations, any user who can exec into the container will be able to read the mounted secrets anyway. This process at least prevents the confidential information, such as credentials or API keys, from being exposed through the REST API or in the KafkaConnector
custom resources.