Skip to main content
Redhat Developers  Logo
  • Products

    Platforms

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat AI
      Red Hat AI
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • View All Red Hat Products

    Featured

    • Red Hat build of OpenJDK
    • Red Hat Developer Hub
    • Red Hat JBoss Enterprise Application Platform
    • Red Hat OpenShift Dev Spaces
    • Red Hat OpenShift Local
    • Red Hat Developer Sandbox

      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Secure Development & Architectures

      • Security
      • Secure coding
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • Product Documentation
    • API Catalog
    • Legacy Documentation
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

File-based Kafka Connect scenarios with end-to-end encryption

October 11, 2022
Hans-Peter Grahsl
Related topics:
KafkaSecurity
Related products:
Red Hat Enterprise Linux

Share:

    The first article in this series explained the need for client-side, end-to-end encryption for data passing through Apache Kafka. The article also introduced the Kryptonite for Kafka project, which integrates with Apache Kafka Connect to achieve automatic encryption and decryption with no changes to application code.

    This article, the second and last in this series, contains more advanced examples that cover a common scenario: File-based data integration between different kinds of storage. I'll show Kryptonite for Kafka features that support fine-grained, field-level encryption in this use case.

    To keep things simple in the examples, a local file system serves as the source while the sink is set up in MinIO (Figure 1), an open source object storage service compatible with AWS S3.

    Kafka Connect encrypts data passed in from a file and decrypts data just before passing it to MinIO.

    As in the previous article, the aim is to secure selected fields of records, this time in JSON files that are transferred between the two storage systems using Kafka Connect. Along the way, the article introduces new features of the custom single message transform (SMT).

    A complex data structure

    The local file system holds a sample text file containing a bunch of JSON objects that store different kinds of personal data for fictional characters. The JSON objects are serialized line by line into this file. In contrast to the flat structure of tabular data in the previous article, the data in this article contains complex field types. One such JSON object, for the personal field and its embedded knownResidences field, looks like this:

    {
        "guid": "837abb22-3e56-426b-8748-90d2ce4b1e5c",
        "personal": {
            "firstname": "Judy",
            "lastname": "Hayes",
            "age": 38,
            "gender": "female",
            "height": 167,
            "weight": 50,
            "eyeColor": "brown"
        },
        "knownResidences": [
            "529 Glenmore Avenue, Waumandee, Connecticut, 8220",
            "489 Lake Street, Glasgow, Tennessee, 1469",
            "927 Rutledge Street, Fresno, Pennsylvania, 5610",
            "728 Debevoise Street, Gerton, Federated States Of Micronesia, 7007"
        ],
        "isActive": true,
        "profilePic": "https://picsum.photos/128",
        "registered": "2018-08-12T01:58:44 -02:00"
    }
    /* ... */
    

    To store all these JSON objects from the file into a Kafka topic, the example adopts a simple-but-good-enough approach, employing the FileStreamSourceConnector that ships with Apache Kafka Connect.

    Suppose that both complex fields, personal (a field with nested objects) and knownResidences (an array) must be protected by encrypting their contents. I'll show two ways to process such complex fields:

    • Encrypt the field as a whole.
    • Individually encrypt each value for each nested field in the personal field and each individual array element in the knownResidences field.

    Encryption as a whole with the OBJECT field mode

    Let's first investigate the encryption of complex field types as a whole. For this, the source connector configuration could be something like this:

    {
        "name": "filesource-enc-001",
        "config": {
            "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": false,
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": false,
            "file":"/tmp/kafka-connect/data/sample_data.txt",
            "topic":"personal-data-enc",
            "transforms":"string2json,cipher",
            "transforms.string2json.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.util.JsonStringReader$Value",
            "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
            "transforms.cipher.cipher_mode": "ENCRYPT",
            "transforms.cipher.cipher_data_keys": "${file:/secrets/classified.properties:cipher_data_keys}",
            "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
            "transforms.cipher.field_config": "[{\"name\":\"personal\"},{\"name\":\"knownResidences\",\"keyId\":\"my-demo-secret-key-987\"}]",
            "transforms.cipher.field_mode": "OBJECT"
        }
    }
    

    The SMT's configuration is defined by the transform.cipher.* properties as follows:

    • Operate on the records' values (type: "CipherField$Value").
    • Encrypt data (cipher_mode: "ENCRYPT").
    • Load the secret key material from an external key file (cipher_data_keys: "${file:/secrets/classified.properties:cipher_data_keys}).
    • Use a specific secret key based on its ID (cipher_data_key_identifier: "my-demo-secret-key-123").
    • Process only selected fields, for which different keyIds can be provided. If no keyId is given for a specific field, use the general keyId defined in the cipher_data_key_identifier property (field_config: "[{\"name\":\"personal\"},{\"name\":\"knownResidences\",\"keyId\":\"my-demo-secret-key-987\"}]).
    • Encrypt complex fields (with nested objects or arrays) as a whole ("transforms.cipher.field_mode": "OBJECT").

    At runtime, all configured keyId instances specified in the field_config settings need to be resolved. For the configuration just shown, each field is encrypted with either the general key (keyId=my-demo-secret-key-123) or a specially chosen one (keyId=my-demo-secret-key-987), both of which must be given in the external properties file named classified.properties:

    cipher_data_keys=[{ "identifier": "my-demo-secret-key-123", "material": { "primaryKeyId": 1000000001, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "GhDRulECKAC8/19NMXDjeCjK", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 1000000001, "outputPrefixType": "TINK" } ] } } ,{ "identifier": "my-demo-secret-key-987", "material": { "primaryKeyId": 1000000002, "key": [ {"keyData":{ "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "GiBIZWxsbyFXb3JsZEZVQ0sxYWJjZGprbCQxMjM0NTY3OA==", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 1000000002, "outputPrefixType": "TINK" } ] } }]
    

    Running a connector instance with this configuration in place produces partially encrypted records such as this one in a Kafka topic called personal-data-enc:

    {
        "guid": "837abb22-3e56-426b-8748-90d2ce4b1e5c",
        "personal": "jwHqA4krnTk6CymRAVovFI+pJsUF0P3QfrRul0AWVMThdDgBBvlyVxs5mnN7Ma1bAUlZThimuxT3QhoXL+YxPhfByae2UiJSMFDDRXJJyWH9mBEIdt4I+82Jin49EdNo5/Cqbv9g2Qf028mm1KsPPTybJZF04gmLMaXntg2aEBo+EKSwgPCrTLm29a+R2dDFDDCxa7E=",
        "knownResidences": "iQKmeFp7i7CJxPW0wc2XEajabWpr8lgDa0TrwQBZXADo1KfKs2Xjes/fM+bGTm7PEYw1v7J1io1Ab0Xyv40MYV1e/L2RXrJHDmkkQnMkdJ+MSZb9Gr5vd/8u6ndrrer6ucshPB/y1mloAlG1J48F4j7bQ1vrEXK9NjYRZEC5w0VPQrTPehMcyRD4CXt8+RGnL3uvhDuBEMCY7zIz31QwOXc5HSHyHuIUf0b4W74oCk1+gVvO0rT1ywgKKxeEJVKK0gLwz4+Hv5o0XEV6jWmyrnqvqJtp0dXJXH8oV9eEIBNeEp6nBL5X+zivMXtEpeYuoNY+sPYelpkltRJnTq1EfzwHOflkgtTjbgUMMLFrsQ==",
        "isActive": true,
        "profilePic": "https://picsum.photos/128",
        "registered": "2018-08-12T01:58:44 -02:00"
    }
    

    The contents of the complex fields have been successfully encrypted. And because encryption was done with field_mode=OBJECT, the original structures of each personal and knownResidences field are opaque. So in addition to protecting sensitive values, this encryption prevents the viewer from even seeing the shape of the information in the complex field. For instance, there's no way to know:

    • That personal is represented by a nested object, and what and how many fields it holds.
    • That knownResidences is an array, and how many elements it contains.

    Encryption with the ELEMENT field mode

    In contrast to the OBJECT mode, the ELEMENT mode enables element-wise processing for complex field types. This mode acts upon the individual fields of nested objects and individual elements in arrays. If you change the field_mode configuration property to ELEMENT, partially encrypted records look like this:

    {
        "guid": "837abb22-3e56-426b-8748-90d2ce4b1e5c",
        "personal": {
            "firstname": "JkEEW6yDzw4iOkiQ3IAeir1hX6Z9gt1pi/1thQJngcH7ORgWAm4MMLFrsQ==",
            "lastname": "J2rv4Z1FTcRUByA/67QQ/im4Gt4YqlKuv5D+m9POUcPUDlpqCUDLDDCxa7E=",
            "age": "IxqTxrGKcBFs3M6+i7S863zywNAniwalRkyd6fI7ENuWmLgMMLFrsQ==",
            "gender": "KCIFdVYWHkmYxJRjNKpulGzy2hVapoSfzPVnhxgHISz031dG7dKuswwwsWux",
            "height": "JAMFXn+hc+8BltkwZeHGza5mcbIW5NK/cuSAHnAPcz6IycrsDDCxa7E=",
            "weight": "I8vbJQjCBbYhh2nOLOxRcF/Ti9n8g/d4cFQlItbnETcpZtAMMLFrsQ==",
            "eyeColor": "J7HqSejwGkgJbqFZw0xXPCX9VqDaGubKgp2+1Esgfcm+c6Xt3TW4DDCxa7E="
        },
        "knownResidences": [
            "VNdFbsuo/3duROXZyGbBnfmzw7HH/lHGbY4x3aDhXyfunQVGZZKQjgTDLA8DrRAHxIRH6MYxPowz+hGUM5S4Dygndss4fp6Mgj6uhJKMgsOVhBt8DDCxa7E=",
            "TIfIG3IumUccz4GhgZ5F6QbzHockvsoq9vH+BQVh9ttN/uYJDFYPOd1fLbhZutJKhwm8SJsMujz2L/AnAr7PenSReFsGrhfWwI2kBgwwsWux",
            "UpUiXKBsXWLPJ6Kl6mNbfzPBHVhXqryzzozCOh78kQm7u7GwtHwqrBoWBEEqhhe/M1HVBMqwjt+jJDZUdmfu7LqaME1Z2dPLhs3sd8vTKsJE9gwwsWux",
            "ZhwsRs5tNAId/mHL6hLwFDOEZ5+C0DBfsvpTVHuMu/c4fUNRUR3YAIXKsTBJF833QFVklsVffyf+WlDhGd53IoI0c1U5YSmm7kJcBBrocewnXpRfOOlWDNlNCzKuW9HJoi+Nbah0DDCxa7E="
        ],
        "isActive": true,
        "profilePic": "https://picsum.photos/128",
        "registered": "2018-08-12T01:58:44 -02:00"
    }
    

    Again, all the sensitive data parts have been encrypted. This time, however, the structural information is not hidden. For instance, viewers can see that the knownResidences field contains an array of four elements.

    Having access to such structural information may be beneficial in certain scenarios. Imagine a target system that receives all partially encrypted records from a sink connector that had no secret key access and thus can't decrypt the data. This target system might still be able to work with the encrypted data to calculate some simple statistics, such as the minimum, maximum, and average number of known residences across all data records.

    Encryption with individual secret keys per field

    We have seen that you can specify an individual keyId for selected fields. In our example, personal and knownResidences have been encrypted with different keys. The primary benefit of doing so is that the same Kafka topic data can be used to feed different sink connectors while enforcing any combination of field-level access policies. The following cases can be implicitly differentiated with such a setup:

    • Sink connectors that are allowed to decrypt both fields, personal and knownResidences (access to both secret keys must be granted)
    • Sink connectors that are allowed to decrypt just one of the encrypted fields, either personal or knownResidences (access to one of the corresponding secret keys must be granted)
    • Sink connectors that are not allowed to decrypt either of the fields (no access to the secret keys is granted)

    Without individual keys, achieving the same flexible level of field-access policies would be hard. For instance, you could be careful to store only a specific subset of the fields into separate Kafka topics, depending on what each sink connector and target system is allowed to see. Then you have to define individual role-based access control settings for these Kafka topics, so that you can grant specific sink connectors access to specific topics. However, this workaround results in a lot of unnecessary redundancy in storage.

    The more payload fields need to be individually encrypted and decrypted to support various field-level access policies per sink connector and target system, the more unusable this workaround becomes. It would effectively lead to a combinatorial explosion of separate Kafka topics, which is just impractical.

    Decryption according to field mode with individual keys

    Irrespective of the chosen field mode (OBJECT or ELEMENT) or the granularity of key settings, you can decrypt all fields and store the records into the desired target system by creating a sink connector configuration that matches the encrypting configuration.

    In this particular example, all decrypted records should ultimately be written to the S3-compatible MinIO object storage. A properly configured Camel-MinIO-Kafka-Connector serves this purpose well:

    {
        "name": "minio-s3-sink-dec-001",
        "config": {
            "connector.class":"org.apache.camel.kafkaconnector.minio.CamelMinioSinkConnector",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": false,
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": false,
            "topics":"personal-data-enc-<objects | elements>",
            "camel.sink.path.bucketName":"kafka-connect-kryptonite-<objects | elements>",
            "camel.sink.endpoint.endpoint":"http://minio:9000",
            "camel.sink.endpoint.autoCreateBucket":true,
            "camel.sink.endpoint.keyName":"${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.json",
            "transforms": "decipher,json2string",
            "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
            "transforms.decipher.cipher_mode": "DECRYPT",
            "transforms.decipher.cipher_data_keys": "${file:/secrets/classified.properties:cipher_data_keys}",
            "transforms.decipher.cipher_data_key_identifier": "my-demo-secret-key-123",
            "transforms.decipher.field_config": "[{\"name\":\"personal\"},{\"name\":\"knownResidences\",\"keyId\":\"my-demo-secret-key-987\"}]",
            "transforms.decipher.field_mode": "<OBJECT | ELEMENT>",
            "transforms.json2string.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.util.JsonStringWriter$Value"
        }
    }
    

    The SMT's configuration is defined by the transform.decipher.* properties as follows:

    • Operate on the records' values (type: "CipherField$Value").
    • Decrypt data (cipher_mode: "DECRYPT").
    • Load the secret key material from an external key file (cipher_data_keys: "${file:/secrets/classified.properties:cipher_data_keys}).
    • Use a specific secret key based on its ID (cipher_data_key_identifier: "my-demo-secret-key-123").
    • Process only selected payload fields for which different keyIds are supported. If no keyId is given for a specific field, use the general keyId defined in the cipher_data_key_identifier property is used (field_config: "[{\"name\":\"personal\"},{\"name\":\"knownResidences\",\"keyId\":\"my-demo-secret-key-987\"}]).
    • Decrypt complex fields (with nested objects or arrays) either as a whole or element by element, matching the mode used during encryption ("transforms.cipher.field_mode": "<OBJECT | ELEMENT>").

    With this configuration in place, the custom SMT preprocesses and decrypts all records before handing the original plaintext records to the Camel MinIO sink connector for storage into the specified MinIO bucket named kafka-connect-kryptonite-objects or kafka-connect-kryptonite-elements. Inspecting one of the bucket objects with the contents of the example record used earlier shows the successfully decrypted data:

    {
        "profilePic": "https://picsum.photos/128",
        "guid": "837abb22-3e56-426b-8748-90d2ce4b1e5c",
        "registered": "2018-08-12T01:58:44 -02:00",
        "personal": {
            "firstname": "Judy",
            "gender": "female",
            "eyeColor": "brown",
            "weight": 50,
            "age": 38,
            "lastname": "Hayes",
            "height": 167
        },
        "isActive": true,
        "knownResidences": [
            "529 Glenmore Avenue, Waumandee, Connecticut, 8220",
            "489 Lake Street, Glasgow, Tennessee, 1469",
            "927 Rutledge Street, Fresno, Pennsylvania, 5610",
            "728 Debevoise Street, Gerton, Federated States Of Micronesia, 7007"
        ]
    }
    

    This concludes our overview of end-to-end data encryption for file-based flows across different kinds of storage systems. A fully working example of this file-based integration scenario can be found in the accompanying demo scenario repository.

    Limitations of Kryptonite for Kafka and upcoming enhancements

    Currently, the custom CipherField SMT has the following limitations, which we hope to overcome in the near future:

    • Instead of assigning the Base64-encoded ciphertext to a field, it could be desirable to preserve the original data format of addressed fields. In cryptography, this is known as format-preserving encryption (FPE). An illustrative example is a credit card, which is identified by a 16-digit number. Applying FPE would result in a different 16-digit number as the ciphertext. Other examples include phone numbers, social security numbers, and the like, for which the encryption is usually based on permutations over finite sets of numeric, alphabetic, or alphanumeric characters.
    • At the moment, the field-level encryption discussed in this article is implemented as an SMT on top of Kafka Connect's intermediary record format. The SMT's configuration-only approach is tied to Kafka Connect. Encrypting and decrypting fields in other Kafka APIs would need additional, custom effort.

    We hope to eliminate these limitations. Fairly soon, we plan to introduce FPE algorithms. Over a longer period of time, we would like to extend Kryptonite for Kafka so it does not require Kafka Connect. For instance, it would be helpful to support client-side field-level cryptography within stream processing applications written to use Kafka's Streams API or SQL abstractions.

    Future versions of Kryptonite for Kafka might also target applications written with the lower-level Kafka producer and consumer APIs. All the cryptography-related code could even be externalized, such as into a separate C library, so that client applications written in non-JVM languages might directly build upon it without the need to reimplement the same cryptography functions across different languages.

    Kryptonite for Kafka strengthens security in Kafka

    This two-part series has explained the need to go beyond the usual data-at-rest protection when building data pipelines on top of Apache Kafka. The Kryptonite for Kafka community project provides a configurable CipherField SMT to perform field-level encryption and decryption for Kafka Connect records on their way into and out of Kafka topics.

    Kryptonite for Kafka helps users safeguard their most precious data by keeping secret keys under their own control and performing all cryptography operations on the client side. Kafka brokers never get to see any of the plaintext of sensitive record fields.

    The examples in the previous article and this one applied the SMT to achieve client-side, end-to-end encryption and decryption across heterogeneous data sources and sinks by means of configuration only. The examples also illustrated the choice between encrypting complex fields in an opaque manner or one that shows their structure, and how to apply individual secret keys for different payload fields.

    Last updated: October 8, 2024

    Recent Posts

    • A deep dive into Apache Kafka's KRaft protocol

    • Staying ahead of artificial intelligence threats

    • Strengthen privacy and security with encrypted DNS in RHEL

    • How to enable Ansible Lightspeed intelligent assistant

    • Why some agentic AI developers are moving code from Python to Rust

    What’s up next?

    Path to GitOps cover card

    GitOps delivers on the vision promised to a DevOps culture, and organizations are starting to realize how valuable it is to deliver products in a fast, highly automated, and secure way without compromising the quality of their code. Read The Path to GitOps and discover the tools, workflows, and structures teams need to have in place in order to enable a complete GitOps workflow.

    Get the e-book
    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit
    © 2025 Red Hat

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue