security, secrets

Kafka Connect is an integration framework that is part of the Apache Kafka project. On Kubernetes and Red Hat OpenShift, you can deploy Kafka Connect using the Strimzi and Red Hat AMQ Streams Operators. Kafka Connect lets users run sink and source connectors. Source connectors are used to load data from an external system into Kafka. Sink connectors work the other way around and let you load data from Kafka into another external system. In most cases, the connectors need to authenticate when connecting to the other systems, so you will need to provide credentials as part of the connector's configuration. This article shows you how you can use Kubernetes secrets to store the credentials and then use them in the connector's configuration.

In this article, I will use an S3 source connector, which is one of the Apache Camel Kafka connectors. To learn more about Apache Camel Kafka connectors, you can start with this blog post. This connector is used just as an example of how to configure a connector to access a secret. You can use this same procedure with any connector, as there is nothing special required from the connector itself. We will use the S3 connector to connect to Amazon AWS S3 storage and load files from an S3 bucket into an Apache Kafka topic. In order to connect to S3 storage, we will need to specify the AWS credentials: the access key and the secret key. So, let’s start by preparing the secret with the credentials.

Creating a secret with the credentials

First, we will create a simple properties file called aws-credentials.properties, which should look like this:

aws_access_key_id=AKIAIOSFODNN7EXAMPLE
aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

The credentials you use in this properties file need to have access to the S3 bucket we will read from. Once we have the properties file with the credentials ready, we have to create the secret from this file. You can use the following command to do so:

$ kubectl create secret generic aws-credentials --from-file=./aws-credentials.properties

Building new container images with the connector

Next, we need to prepare a new Docker image with our connector. When using Strimzi, the Dockerfile for adding the connector should look something like this:

FROM strimzi/kafka:0.16.1-kafka-2.4.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

When using AMQ Streams, it should look like this:

FROM registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER jboss:jboss

Use the Dockerfile to build a container image with the connectors you need, and push them into your registry. If you don’t have your own private registry, you can use one of the public registries such as Quay or Docker Hub.

Deploying Apache Kafka Connect

Once we have the container image, we can finally deploy Apache Kafka Connect. You can do this by creating the following custom resource:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  image: docker.io/scholzj/kafka:camel-kafka-2.4.0
  replicas: 3
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  externalConfiguration:
    volumes:
      - name: aws-credentials
        secret:
          secretName: aws-credentials
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false

Let’s look in more detail at several parts of the custom resource. First of all, notice the image field, which tells the Operator deploying Apache Kafka Connect to use the right image with the added connectors. In my case, I pushed the container image built in previous section to Docker Hub as scholzj/kafka:camel-kafka-2.4.0, so my configuration looks like this:

image: docker.io/scholzj/kafka:camel-kafka-2.4.0

Next, notice the externalConfiguration section:

externalConfiguration:
  volumes:
    - name: aws-credentials
      secret:
        secretName: aws-credentials

In this section, we instruct the Operator to mount the Kubernetes secret aws-credentialsthat we created at the beginning of this article into the Apache Kafka Connect pods. The secrets listed here will be mounted in the path /opt/kafka/external-configuration/<secretName> where the <secretName> is the name of the secret.

And finally, in the config section, we enable the FileConfigProvider as a configuration provider in Apache Kafka Connect:

config:
  config.providers: file
  config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider

Configuration providers are a way of loading configuration values from another source instead of specifying them in the configuration directly. In this case, we create the configuration provider namef file, which will use the FileConfigProvider class. This configuration provider is part of Apache Kafka. FileConfigProvider can read properties files and extract values from them, and we will use it to load API keys for our Amazon AWS account.

Creating the connector using the Apache Kafka Connect REST API

Usually, we have to wait a minute or two for the Apache Kafka Connect deployment to become ready. And once it is ready, we can create the connector instance. In older versions of Strimzi and Red Hat AMQ Streams, you have to do that using the REST API. We can create the connector by posting the following JSON:

{
 "name": "s3-connector",
 "config": {
   "connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector",
   "tasks.max": "1",
   "camel.source.kafka.topic": "s3-topic",
   "camel.source.maxPollDuration": "10000",
   "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter",
   "camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
   "camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
   "camel.component.aws-s3.configuration.region": "US_EAST_1"
   }
}

The connector configuration contains the AWS API keys in the fields camel.component.aws-s3.configuration.access-key and camel.component.aws-s3.configuration.secret-key. Instead of using the values directly, we just reference the file configuration provider to load the fields aws_access_key_id and aws_secret_access_key from our aws-credentials.properties file.

Notice how we reference the config provider, tell it the path to the file it should use, and include the name of the key to extract:

"camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}"

and:

"camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}"

You can POST the results to the Apache Kafka Connect REST API, for example, using curl:

$ curl -X POST -H "Content-Type: application/json" -d connector-config.json http://my-connect-cluster-connect-api:8083/connectors

One of the advantages of using the configuration providers is that even when you later get the connector configuration, it will still contain the configuration provider and not the values you want to keep secret:

$ curl http://my-connect-cluster-connect-api:8083/connectors/s3-connector
{
  "name": "s3-connector",
  "config": {
    "connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector",
    "camel.source.maxPollDuration": "10000",
    "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false",
    "camel.component.aws-s3.configuration.region": "US_EAST_1",
    "camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
    "tasks.max": "1",
    "name": "s3-connector",
    "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter",
    "camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "camel.source.kafka.topic": "s3-topic"
  },
  "tasks": [
    {
      "connector": "s3-connector",
      "task": 0
    }
  ],
  "type": "source"
}

Creating the connector with the Strimzi connector Operator

When using Strimzi 0.16.0 or newer, we can also use the new connector Operator. It lets us create the connector using the following custom resource YAML (you can use the configuration provider directly in the KafkaConnector custom resource as well):

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.CamelSourceConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.camel.kafkaconnector.converters.S3ObjectConverter
    camel.source.kafka.topic: s3-topic
    camel.source.url: aws-s3://camel-connector-test?autocloseBody=false
    camel.source.maxPollDuration: 10000
    camel.component.aws-s3.configuration.access-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.component.aws-s3.configuration.secret-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.component.aws-s3.configuration.region: US_EAST_1

Conclusion

The security of Kubernetes secrets has its limitations, any user who can exec into the container will be able to read the mounted secrets anyway. This process at least prevents the confidential information, such as credentials or API keys, from being exposed through the REST API or in the KafkaConnector custom resources.

Last updated: March 29, 2023