Skip to main content
Redhat Developers  Logo
  • Products

    Featured

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat OpenShift AI
      Red Hat OpenShift AI
    • Red Hat Enterprise Linux AI
      Linux icon inside of a brain
    • Image mode for Red Hat Enterprise Linux
      RHEL image mode
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • Red Hat Developer Hub
      Developer Hub
    • View All Red Hat Products
    • Linux

      • Red Hat Enterprise Linux
      • Image mode for Red Hat Enterprise Linux
      • Red Hat Universal Base Images (UBI)
    • Java runtimes & frameworks

      • JBoss Enterprise Application Platform
      • Red Hat build of OpenJDK
    • Kubernetes

      • Red Hat OpenShift
      • Microsoft Azure Red Hat OpenShift
      • Red Hat OpenShift Virtualization
      • Red Hat OpenShift Lightspeed
    • Integration & App Connectivity

      • Red Hat Build of Apache Camel
      • Red Hat Service Interconnect
      • Red Hat Connectivity Link
    • AI/ML

      • Red Hat OpenShift AI
      • Red Hat Enterprise Linux AI
    • Automation

      • Red Hat Ansible Automation Platform
      • Red Hat Ansible Lightspeed
    • Developer tools

      • Red Hat Trusted Software Supply Chain
      • Podman Desktop
      • Red Hat OpenShift Dev Spaces
    • Developer Sandbox

      Developer Sandbox
      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Secure Development & Architectures

      • Security
      • Secure coding
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
      • View All Technologies
    • Start exploring in the Developer Sandbox for free

      sandbox graphic
      Try Red Hat's products and technologies without setup or configuration.
    • Try at no cost
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • Java
      Java icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • API Catalog
    • Product Documentation
    • Legacy Documentation
    • Red Hat Learning

      Learning image
      Boost your technical skills to expert-level with the help of interactive lessons offered by various Red Hat Learning programs.
    • Explore Red Hat Learning
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

End-to-end field-level encryption for Apache Kafka Connect

September 27, 2022
Hans-Peter Grahsl
Related topics:
KafkaSecurity
Related products:
Red Hat Enterprise Linux

Share:

    Encryption is valuable in Apache Kafka, as with other communication tools, for protecting data that might be sent to unanticipated or untrustworthy recipients. This series of articles introduces the open source Kryptonite for Kafka library, which is a community project I wrote. Kryptonite for Kafka requires no changes to source code, as it works entirely through configuration files. It currently does so by encrypting data through integration with Apache Kafka Connect, but there are plans to extend the scope of the project to other integration strategies for Kafka.

    This first article in the series demonstrates encryption on individual fields in structured data, using a relational database and a NoSQL database as examples. The second article focuses on files and introduces some additional sophistication, such as using different keys for different fields.

    How Kryptonite for Kafka reduces the risk of a data breach

    Kafka can take advantage of several security features, ranging from authentication and authorization to TLS-based, over-the-wire traffic encryption of data on its way in and out of Kafka topics. Although these measures secure data in transit, they take place within Kafka, so there is always a stage where the broker has to see plaintext data and temporarily keep it in memory.

    This stage can be considered a blind spot in Kafka security. The data might be encrypted on disk, but the Kafka brokers see the plaintext data right before storing it, and decrypt the data temporarily every time they read it back from disk. Therefore, disk encryption alone cannot protect against RAM scraping and other clever attack vectors.

    Kryptonite for Kafka plugs this in-memory loophole and offers added sophistication, such as the ability to encrypt specific fields in structured data.

    Encryption outside the Kafka brokers

    Let's imagine a generic Kafka Connect data integration. Assume we would like to encrypt a certain subset of sensitive fields found in a ConnectRecord payload originating from any data source (Figure 1).

    In the source record, the social security number needs to be protected through encryption.
    Figure 1: In the source record, the social security number needs to be protected through encryption.

    On the consumer side, we need to decrypt the same subset of previously encrypted fields in a ConnectRecord payload directed to the data sink (Figure 2).

    Before delivering the record to the data sink, the social security number is automatically decrypted.
    Figure 2: Before delivering the record to the data sink, the social security number is automatically decrypted.
    Figure 2: The social security number must be decrypted before it's delivered to the data sink.

    To make sure the Kafka brokers themselves never get to see—let alone directly store—the original plaintext for sensitive data fields, the encryption and decryption must happen outside of the brokers, a step represented by the pink question marks in Figures 1 and 2.

    Kryptonite for Kafka's approach to data encryption: An overview

    Before Kryptonite for Kafka, no flexible and convenient way to accomplish client-side field-level cryptography was available for Kafka in free and open source software, neither with the standard features in Kafka Connect nor with additional libraries or free tools from Kafka Connect's open-source ecosystem.

    Encryption could happen directly in the original client or in some intermediary process like a sidecar or a proxy, but an intermediary would likely impose higher deployment effort and an additional operational burden. To avoid this, Kryptonite for Kafka currently performs the encryption and decryption during a Kafka Connect integration. The worker nodes of a Kafka Connect cluster encrypt the fields designated as sensitive within ConnectRecord instances.

    For that purpose, the library provides a turnkey ready single message transform (SMT) to apply field-level encryption and decryption to Kafka Connect records. The system is agnostic to the type of message serialization chosen. It uses authenticated encryption with associated data (AEAD), and in particular applies AES in either GCM or SIV mode.

    Each encrypted field is represented in the output as a Base64-encoded string that contains the ciphertext of the field's value along with metadata. The metadata consists of a version identifier for the Kryptonite for Kafka library itself, a short identifier for the encryption algorithm, and an identifier for the secret key material. The metadata is authenticated but not encrypted, so Kryptonite for Kafka on the consumer side can read it and use it to decrypt the data.

    For schema-aware message formats such as AVRO, the original schema of a data record is redacted so that encrypted fields can be stored in Base64-encoded form, changing the original data types for the affected fields.

    In a nutshell, the configurable CipherField SMT can be plugged into arbitrary Kafka Connect pipelines, safeguarding sensitive and precious data against any form of uncontrolled or illegal access on the data's way into and out of Kafka brokers (Figures 3 and 4).

    Kafka Connect encrypts the data by piping it through the SMT before delivery to the brokers.
    Figure 3: Kafka Connect encrypts the data by piping it through the SMT before delivery to the brokers.
    Kafka Connect decrypts the data by piping it through the SMT before sink connectors deliver it to the target system.
    Figure 4: Kafka Connect decrypts the data by piping it through the SMT before sink connectors deliver it to the target system.

    Advantages of encryption outside the Kafka brokers

    With the Kryptonite for Kafka approach, sensitive fields are automatically and always secured, not only in transit but also at rest, whenever sensitive data is outside the Kafka Connect environment. Protection is guaranteed for all target systems and for whatever downstream consumers eventually get their hands on the data. Even if someone has access to the brokers, they cannot misuse the sensitive parts of the data unless they manage to steal the secret keys from (usually remote) client environments or break the underlying cryptography itself.

    In short, Kryptonite for Kafka offers an additional layer of data security independent of whoever owns or operates the Kafka cluster, thus protecting the data against internal attackers. Encryption also protects data against external attackers who might get fraudulent access to the Kafka topic data in the future.

    Given such a setup, you can precisely define which downstream consumers can read the sensitive data fields. In a Kafka Connect data integration pipeline, only sink connectors explicitly given access to the secret keys can successfully decrypt the protected data parts.

    Example: Exchanging structured data between a relational database and a NoSQL database

    This example demonstrates field-level data protection during replication between two different types of databases. The source is MySQL and the sink is MongoDB. The exchange uses the open source Debezium platform for log-based change data capture (CDC). Kryptonite for Kafka post-processes Debezium's MySQL CDC event payloads to encrypt certain fields before the data leaves Kafka Connect on the client side and reaches the Kafka brokers. The MongoDB sink connector that reads these Kafka records gets access to pre-processed, properly decrypted records, and at the end of the process stores plaintext documents into MongoDB collections (Figure 5).

    Kafka Connect encrypts data passed in from MySQL through Debezium and decrypts data just before passing it to MongoDB, so that the Kafka brokers see only encrypted data.
    Figure 5: Kafka Connect encrypts data passed in from MySQL through Debezium and decrypts data just before passing it to MongoDB, so that the Kafka brokers see only encrypted data.

    Producer-side encryption

    The MySQL instance stores, among other data, an addresses table containing a couple of rows representing different kinds of fictional customer addresses (Figure 6).

    A column for the customer's street is one of several columns in a MySQL table.
    Figure 6: A column for the customer's street is one of several columns in a MySQL table.

    The following Debezium MySQL source connector captures all existing addresses, together with any future changes from the table:

    {
        "name": "mysql-source-001",
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": false,
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": false,
            "tasks.max": "1",
            "database.hostname": "mysql",
            "database.port": "3306",
            "database.user": "root",
            "database.password": "debezium",
            "database.server.id": "1234567",
            "database.server.name": "mysqlhost",
            "database.whitelist": "inventory",
            "table.whitelist": "inventory.addresses",
            "database.history.kafka.bootstrap.servers": "kafka:9092",
            "database.history.kafka.topic": "mysqlhost-schema",
            "transforms": "unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones": false,
            "transforms.unwrap.delete.handling.mode": "drop"
        }
    }
    

    The preceding configuration is pretty straightforward, serializing CDC events as JSON without explicit schema information and unwrapping the Debezium payloads. Unsurprisingly, the resulting plaintext record values stored in the corresponding Kafka topic (called mysqlhost.inventory.addresses) will look something like this:

    {
      "id": 13,
      "customer_id": 1003,
      "street": "3787 Brownton Road",
      "city": "Columbus",
      "state": "Mississippi",
      "zip": "39701",
      "type": "SHIPPING"
    }
    

    Let's assume the task is now to make sure that values originating from the addresses table's street column (Figure 7) for all these CDC event payloads must not be stored in Kafka topics as plaintext.

    Thanks to the field-level encryption capabilities of the custom CipherField SMT, fine-grained protection can be easily achieved without writing any additional code. Simply add the following to the Debezium MySQL source connector configuration:

    {
        "name": "mysql-source-enc-001",
        "config": {
            /* ... */
            "transforms": "unwrap,cipher",
            /* ... */
            "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
            "transforms.cipher.cipher_mode": "ENCRYPT",
            "transforms.cipher.cipher_data_keys": "${file:/secrets/classified.properties:cipher_data_keys}",
            "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
            "transforms.cipher.field_config": "[{\"name\":\"street\"}]",
            "transforms.cipher.predicate":"isTombstone",
            "transforms.cipher.negate":true,
            "predicates": "isTombstone",
            "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
        }
    }
    

    The SMT configuration contains transform.cipher.* properties with the following meanings:

    • Operate on the records' values (type: "CipherField$Value").
    • Encrypt data (cipher_mode: "ENCRYPT").
    • Load the secret key material from an external key file (cipher_data_keys: "${file:/secrets/classified.properties:cipher_data_keys}).
    • Use a specific secret key based on its ID (cipher_data_key_identifier: "my-demo-secret-key-123").
    • Process only the street field (field_config: "[{\"name\":\"street\"}]").
    • Ignore any tombstone records (see predicate definitions).

    Maintaining the secrecy of the secret key materials is of utmost importance, because leaking any of the secret keys renders encryption useless. Secret key exchange is a complex topic of its own. This example obtains the keys indirectly from an external file, which contains a single property called cipher_data_keys that in turn holds an array of key definition objects (identifier and material):

    cipher_data_keys=[ { "identifier": "my-demo-secret-key-123", "material": { "primaryKeyId": 1000000001, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "GhDRulECKAC8/19NMXDjeCjK", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 1000000001, "outputPrefixType": "TINK" } ] } } ]
    

    More details about externalizing sensitive configuration parameters in Kafka Connect can be found in the library's documentation.

    The resulting source connector configuration achieves the goal of partially encrypting all Debezium CDC event payloads before they are sent to and stored by the Kafka brokers. The resulting record values in the topic mysqlhost.inventory.addresses reflect the encryption:

    {
      "id": 13,
      "customer_id": 1003,
      "street": "NLWw4AshpLIIBjLoqM0EgDiUGooYH3jwDnW71wdInMGomFVLHo9AQ6QPEh6fmLRJKVwE3gwwsWux",
      "city": "Columbus",
      "state": "Mississippi",
      "zip": "39701",
      "type": "SHIPPING"
    }
    

    Consumer-side decryption

    The sink connector that picks up these partially encrypted Kafka records needs to properly apply the custom CipherField SMT as well. Only then can the connector get access to the previously encrypted Debezium CDC payload fields before writing them into the targeted data store, which is MongoDB in this case.

    The MongoDB sink connector configuration for this example might look like this:

    {
        "name": "mongodb-sink-dec-001",
        "config": {
            "topics": "mysqlhost.inventory.addresses",
            "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable":false,
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable":false,
            "tasks.max": "1",
            "connection.uri":"mongodb://mongodb:27017",
            "database":"kryptonite",
            "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
            "delete.on.null.values": true,
            "transforms": "createid,removefield,decipher",
            "transforms.createid.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
            "transforms.createid.renames": "id:_id",
            "transforms.removefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.removefield.blacklist": "id",
            "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
            "transforms.decipher.cipher_mode": "DECRYPT",
            "transforms.decipher.cipher_data_keys": "${file:/secrets/classified.properties:cipher_data_keys}",
            "transforms.decipher.field_config": "[{\"name\":\"street\"}]",
            "transforms.decipher.predicate":"isTombstone",
            "transforms.decipher.negate":true,
            "predicates": "isTombstone",
            "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
        }
    }
    

    The interesting part again is the configuration's transform.decipher.* properties, which are defined as follows:

    • Operate on the records' values (type: "CipherField$Value").
    • Decrypt data (cipher_mode: "DECRYPT").
    • Load the secret key material from an external key file (cipher_data_keys: "${file:/secrets/classified.properties:cipher_data_keys}).
    • Use a specific secret key based on its ID (cipher_data_key_identifier: "my-secret-key-123").
    • Process only the name field (field_config: "[{\"name\":\"street\"}]").
    • Ignore any tombstone records (see predicate definitions).

    With this configuration in place, Kafka Connect processes all CDC events by first applying the custom CipherField SMT to decrypt selected fields, and then handing them over to the sink connector itself to write the plaintext documents into a MongoDB database collection called kryptonite.mysqlhost.inventory.addresses. An example document for the address having _id=13 is shown here in its JSON representation:

    { "_id": 13,
      "zip": "39701",
      "city": "Columbus",
      "street": "3787 Brownton Road",
      "state": "Mississippi",
      "customer_id": 1003,
      "type": "SHIPPING" }
    

    This example has shown how end-to-end change data capture pipelines between heterogenous databases can be secured by explicitly protecting sensitive CDC payload fields. Proper configuration of a custom SMT is sufficient to achieve client-side field-level encryption and decryption of Kafka Connect records on their way in and out of Kafka topics. A fully working example of this database integration scenario can be found in the accompanying demo scenario repository.

    The second article in this series will show the use of Kryptonite for Kafka with files, introduce more features, and discuss plans for the future of the project.

    Last updated: August 14, 2023

    Recent Posts

    • Assessing AI for OpenShift operations: Advanced configurations

    • OpenShift Lightspeed: Assessing AI for OpenShift operations

    • OpenShift Data Foundation and HashiCorp Vault securing data

    • Axolotl meets LLM Compressor: Fast, sparse, open

    • What’s new for developers in Red Hat OpenShift 4.19

    What’s up next?

    Path to GitOps cover card

    GitOps delivers on the vision promised to a DevOps culture, and organizations are starting to realize how valuable it is to deliver products in a fast, highly automated, and secure way without compromising the quality of their code. Read The Path to GitOps and discover the tools, workflows, and structures teams need to have in place in order to enable a complete GitOps workflow.

    Get the e-book
    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue