The first article in this series explained the need for client-side, end-to-end encryption for data passing through Apache Kafka. The article also introduced the Kryptonite for Kafka project, which integrates with Apache Kafka Connect to achieve automatic encryption and decryption with no changes to application code.
This article, the second and last in this series, contains more advanced examples that cover a common scenario: File-based data integration between different kinds of storage. I'll show Kryptonite for Kafka features that support fine-grained, field-level encryption in this use case.
To keep things simple in the examples, a local file system serves as the source while the sink is set up in MinIO (Figure 1), an open source object storage service compatible with AWS S3.
As in the previous article, the aim is to secure selected fields of records, this time in JSON files that are transferred between the two storage systems using Kafka Connect. Along the way, the article introduces new features of the custom single message transform (SMT).
A complex data structure
The local file system holds a sample text file containing a bunch of JSON objects that store different kinds of personal data for fictional characters. The JSON objects are serialized line by line into this file. In contrast to the flat structure of tabular data in the previous article, the data in this article contains complex field types. One such JSON object, for the personal
field and its embedded knownResidences
field, looks like this:
{
"guid": "837abb22-3e56-426b-8748-90d2ce4b1e5c",
"personal": {
"firstname": "Judy",
"lastname": "Hayes",
"age": 38,
"gender": "female",
"height": 167,
"weight": 50,
"eyeColor": "brown"
},
"knownResidences": [
"529 Glenmore Avenue, Waumandee, Connecticut, 8220",
"489 Lake Street, Glasgow, Tennessee, 1469",
"927 Rutledge Street, Fresno, Pennsylvania, 5610",
"728 Debevoise Street, Gerton, Federated States Of Micronesia, 7007"
],
"isActive": true,
"profilePic": "https://picsum.photos/128",
"registered": "2018-08-12T01:58:44 -02:00"
}
/* ... */
To store all these JSON objects from the file into a Kafka topic, the example adopts a simple-but-good-enough approach, employing the FileStreamSourceConnector
that ships with Apache Kafka Connect.
Suppose that both complex fields, personal
(a field with nested objects) and knownResidences
(an array) must be protected by encrypting their contents. I'll show two ways to process such complex fields:
- Encrypt the field as a whole.
- Individually encrypt each value for each nested field in the
personal
field and each individual array element in theknownResidences
field.
Encryption as a whole with the OBJECT field mode
Let's first investigate the encryption of complex field types as a whole. For this, the source connector configuration could be something like this:
{
"name": "filesource-enc-001",
"config": {
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file":"/tmp/kafka-connect/data/sample_data.txt",
"topic":"personal-data-enc",
"transforms":"string2json,cipher",
"transforms.string2json.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.util.JsonStringReader$Value",
"transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "ENCRYPT",
"transforms.cipher.cipher_data_keys": "${file:/secrets/classified.properties:cipher_data_keys}",
"transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
"transforms.cipher.field_config": "[{\"name\":\"personal\"},{\"name\":\"knownResidences\",\"keyId\":\"my-demo-secret-key-987\"}]",
"transforms.cipher.field_mode": "OBJECT"
}
}
The SMT's configuration is defined by the transform.cipher.*
properties as follows:
- Operate on the records' values (
type: "CipherField$Value"
). - Encrypt data (
cipher_mode: "ENCRYPT"
). - Load the secret key material from an external key file (
cipher_data_keys: "${file:/secrets/classified.properties:cipher_data_keys}
). - Use a specific secret key based on its ID (
cipher_data_key_identifier: "my-demo-secret-key-123"
). - Process only selected fields, for which different
keyId
s can be provided. If nokeyId
is given for a specific field, use the generalkeyId
defined in thecipher_data_key_identifier
property (field_config: "[{\"name\":\"personal\"},{\"name\":\"knownResidences\",\"keyId\":\"my-demo-secret-key-987\"}]
). - Encrypt complex fields (with nested objects or arrays) as a whole (
"transforms.cipher.field_mode": "OBJECT"
).
At runtime, all configured keyId
instances specified in the field_config
settings need to be resolved. For the configuration just shown, each field is encrypted with either the general key (keyId=my-demo-secret-key-123
) or a specially chosen one (keyId=my-demo-secret-key-987
), both of which must be given in the external properties file named classified.properties
:
cipher_data_keys=[{ "identifier": "my-demo-secret-key-123", "material": { "primaryKeyId": 1000000001, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "GhDRulECKAC8/19NMXDjeCjK", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 1000000001, "outputPrefixType": "TINK" } ] } } ,{ "identifier": "my-demo-secret-key-987", "material": { "primaryKeyId": 1000000002, "key": [ {"keyData":{ "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "GiBIZWxsbyFXb3JsZEZVQ0sxYWJjZGprbCQxMjM0NTY3OA==", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 1000000002, "outputPrefixType": "TINK" } ] } }]
Running a connector instance with this configuration in place produces partially encrypted records such as this one in a Kafka topic called personal-data-enc
:
{
"guid": "837abb22-3e56-426b-8748-90d2ce4b1e5c",
"personal": "jwHqA4krnTk6CymRAVovFI+pJsUF0P3QfrRul0AWVMThdDgBBvlyVxs5mnN7Ma1bAUlZThimuxT3QhoXL+YxPhfByae2UiJSMFDDRXJJyWH9mBEIdt4I+82Jin49EdNo5/Cqbv9g2Qf028mm1KsPPTybJZF04gmLMaXntg2aEBo+EKSwgPCrTLm29a+R2dDFDDCxa7E=",
"knownResidences": "iQKmeFp7i7CJxPW0wc2XEajabWpr8lgDa0TrwQBZXADo1KfKs2Xjes/fM+bGTm7PEYw1v7J1io1Ab0Xyv40MYV1e/L2RXrJHDmkkQnMkdJ+MSZb9Gr5vd/8u6ndrrer6ucshPB/y1mloAlG1J48F4j7bQ1vrEXK9NjYRZEC5w0VPQrTPehMcyRD4CXt8+RGnL3uvhDuBEMCY7zIz31QwOXc5HSHyHuIUf0b4W74oCk1+gVvO0rT1ywgKKxeEJVKK0gLwz4+Hv5o0XEV6jWmyrnqvqJtp0dXJXH8oV9eEIBNeEp6nBL5X+zivMXtEpeYuoNY+sPYelpkltRJnTq1EfzwHOflkgtTjbgUMMLFrsQ==",
"isActive": true,
"profilePic": "https://picsum.photos/128",
"registered": "2018-08-12T01:58:44 -02:00"
}
The contents of the complex fields have been successfully encrypted. And because encryption was done with field_mode=OBJECT
, the original structures of each personal
and knownResidences
field are opaque. So in addition to protecting sensitive values, this encryption prevents the viewer from even seeing the shape of the information in the complex field. For instance, there's no way to know:
- That
personal
is represented by a nested object, and what and how many fields it holds. - That
knownResidences
is an array, and how many elements it contains.
Encryption with the ELEMENT field mode
In contrast to the OBJECT
mode, the ELEMENT
mode enables element-wise processing for complex field types. This mode acts upon the individual fields of nested objects and individual elements in arrays. If you change the field_mode
configuration property to ELEMENT
, partially encrypted records look like this:
{
"guid": "837abb22-3e56-426b-8748-90d2ce4b1e5c",
"personal": {
"firstname": "JkEEW6yDzw4iOkiQ3IAeir1hX6Z9gt1pi/1thQJngcH7ORgWAm4MMLFrsQ==",
"lastname": "J2rv4Z1FTcRUByA/67QQ/im4Gt4YqlKuv5D+m9POUcPUDlpqCUDLDDCxa7E=",
"age": "IxqTxrGKcBFs3M6+i7S863zywNAniwalRkyd6fI7ENuWmLgMMLFrsQ==",
"gender": "KCIFdVYWHkmYxJRjNKpulGzy2hVapoSfzPVnhxgHISz031dG7dKuswwwsWux",
"height": "JAMFXn+hc+8BltkwZeHGza5mcbIW5NK/cuSAHnAPcz6IycrsDDCxa7E=",
"weight": "I8vbJQjCBbYhh2nOLOxRcF/Ti9n8g/d4cFQlItbnETcpZtAMMLFrsQ==",
"eyeColor": "J7HqSejwGkgJbqFZw0xXPCX9VqDaGubKgp2+1Esgfcm+c6Xt3TW4DDCxa7E="
},
"knownResidences": [
"VNdFbsuo/3duROXZyGbBnfmzw7HH/lHGbY4x3aDhXyfunQVGZZKQjgTDLA8DrRAHxIRH6MYxPowz+hGUM5S4Dygndss4fp6Mgj6uhJKMgsOVhBt8DDCxa7E=",
"TIfIG3IumUccz4GhgZ5F6QbzHockvsoq9vH+BQVh9ttN/uYJDFYPOd1fLbhZutJKhwm8SJsMujz2L/AnAr7PenSReFsGrhfWwI2kBgwwsWux",
"UpUiXKBsXWLPJ6Kl6mNbfzPBHVhXqryzzozCOh78kQm7u7GwtHwqrBoWBEEqhhe/M1HVBMqwjt+jJDZUdmfu7LqaME1Z2dPLhs3sd8vTKsJE9gwwsWux",
"ZhwsRs5tNAId/mHL6hLwFDOEZ5+C0DBfsvpTVHuMu/c4fUNRUR3YAIXKsTBJF833QFVklsVffyf+WlDhGd53IoI0c1U5YSmm7kJcBBrocewnXpRfOOlWDNlNCzKuW9HJoi+Nbah0DDCxa7E="
],
"isActive": true,
"profilePic": "https://picsum.photos/128",
"registered": "2018-08-12T01:58:44 -02:00"
}
Again, all the sensitive data parts have been encrypted. This time, however, the structural information is not hidden. For instance, viewers can see that the knownResidences
field contains an array of four elements.
Having access to such structural information may be beneficial in certain scenarios. Imagine a target system that receives all partially encrypted records from a sink connector that had no secret key access and thus can't decrypt the data. This target system might still be able to work with the encrypted data to calculate some simple statistics, such as the minimum, maximum, and average number of known residences across all data records.
Encryption with individual secret keys per field
We have seen that you can specify an individual keyId
for selected fields. In our example, personal
and knownResidences
have been encrypted with different keys. The primary benefit of doing so is that the same Kafka topic data can be used to feed different sink connectors while enforcing any combination of field-level access policies. The following cases can be implicitly differentiated with such a setup:
- Sink connectors that are allowed to decrypt both fields,
personal
andknownResidences
(access to both secret keys must be granted) - Sink connectors that are allowed to decrypt just one of the encrypted fields, either
personal
orknownResidences
(access to one of the corresponding secret keys must be granted) - Sink connectors that are not allowed to decrypt either of the fields (no access to the secret keys is granted)
Without individual keys, achieving the same flexible level of field-access policies would be hard. For instance, you could be careful to store only a specific subset of the fields into separate Kafka topics, depending on what each sink connector and target system is allowed to see. Then you have to define individual role-based access control settings for these Kafka topics, so that you can grant specific sink connectors access to specific topics. However, this workaround results in a lot of unnecessary redundancy in storage.
The more payload fields need to be individually encrypted and decrypted to support various field-level access policies per sink connector and target system, the more unusable this workaround becomes. It would effectively lead to a combinatorial explosion of separate Kafka topics, which is just impractical.
Decryption according to field mode with individual keys
Irrespective of the chosen field mode (OBJECT
or ELEMENT
) or the granularity of key settings, you can decrypt all fields and store the records into the desired target system by creating a sink connector configuration that matches the encrypting configuration.
In this particular example, all decrypted records should ultimately be written to the S3-compatible MinIO object storage. A properly configured Camel-MinIO-Kafka-Connector serves this purpose well:
{
"name": "minio-s3-sink-dec-001",
"config": {
"connector.class":"org.apache.camel.kafkaconnector.minio.CamelMinioSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"topics":"personal-data-enc-<objects | elements>",
"camel.sink.path.bucketName":"kafka-connect-kryptonite-<objects | elements>",
"camel.sink.endpoint.endpoint":"http://minio:9000",
"camel.sink.endpoint.autoCreateBucket":true,
"camel.sink.endpoint.keyName":"${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.json",
"transforms": "decipher,json2string",
"transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.decipher.cipher_mode": "DECRYPT",
"transforms.decipher.cipher_data_keys": "${file:/secrets/classified.properties:cipher_data_keys}",
"transforms.decipher.cipher_data_key_identifier": "my-demo-secret-key-123",
"transforms.decipher.field_config": "[{\"name\":\"personal\"},{\"name\":\"knownResidences\",\"keyId\":\"my-demo-secret-key-987\"}]",
"transforms.decipher.field_mode": "<OBJECT | ELEMENT>",
"transforms.json2string.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.util.JsonStringWriter$Value"
}
}
The SMT's configuration is defined by the transform.decipher.*
properties as follows:
- Operate on the records' values (
type: "CipherField$Value"
). - Decrypt data (
cipher_mode: "DECRYPT"
). - Load the secret key material from an external key file (
cipher_data_keys: "${file:/secrets/classified.properties:cipher_data_keys}
). - Use a specific secret key based on its ID (
cipher_data_key_identifier: "my-demo-secret-key-123"
). - Process only selected payload fields for which different
keyId
s are supported. If nokeyId
is given for a specific field, use the generalkeyId
defined in thecipher_data_key_identifier
property is used (field_config: "[{\"name\":\"personal\"},{\"name\":\"knownResidences\",\"keyId\":\"my-demo-secret-key-987\"}]
). - Decrypt complex fields (with nested objects or arrays) either as a whole or element by element, matching the mode used during encryption (
"transforms.cipher.field_mode": "<OBJECT | ELEMENT>"
).
With this configuration in place, the custom SMT preprocesses and decrypts all records before handing the original plaintext records to the Camel MinIO sink connector for storage into the specified MinIO bucket named kafka-connect-kryptonite-objects
or kafka-connect-kryptonite-elements
. Inspecting one of the bucket objects with the contents of the example record used earlier shows the successfully decrypted data:
{
"profilePic": "https://picsum.photos/128",
"guid": "837abb22-3e56-426b-8748-90d2ce4b1e5c",
"registered": "2018-08-12T01:58:44 -02:00",
"personal": {
"firstname": "Judy",
"gender": "female",
"eyeColor": "brown",
"weight": 50,
"age": 38,
"lastname": "Hayes",
"height": 167
},
"isActive": true,
"knownResidences": [
"529 Glenmore Avenue, Waumandee, Connecticut, 8220",
"489 Lake Street, Glasgow, Tennessee, 1469",
"927 Rutledge Street, Fresno, Pennsylvania, 5610",
"728 Debevoise Street, Gerton, Federated States Of Micronesia, 7007"
]
}
This concludes our overview of end-to-end data encryption for file-based flows across different kinds of storage systems. A fully working example of this file-based integration scenario can be found in the accompanying demo scenario repository.
Limitations of Kryptonite for Kafka and upcoming enhancements
Currently, the custom CipherField
SMT has the following limitations, which we hope to overcome in the near future:
- Instead of assigning the Base64-encoded ciphertext to a field, it could be desirable to preserve the original data format of addressed fields. In cryptography, this is known as format-preserving encryption (FPE). An illustrative example is a credit card, which is identified by a 16-digit number. Applying FPE would result in a different 16-digit number as the ciphertext. Other examples include phone numbers, social security numbers, and the like, for which the encryption is usually based on permutations over finite sets of numeric, alphabetic, or alphanumeric characters.
- At the moment, the field-level encryption discussed in this article is implemented as an SMT on top of Kafka Connect's intermediary record format. The SMT's configuration-only approach is tied to Kafka Connect. Encrypting and decrypting fields in other Kafka APIs would need additional, custom effort.
We hope to eliminate these limitations. Fairly soon, we plan to introduce FPE algorithms. Over a longer period of time, we would like to extend Kryptonite for Kafka so it does not require Kafka Connect. For instance, it would be helpful to support client-side field-level cryptography within stream processing applications written to use Kafka's Streams API or SQL abstractions.
Future versions of Kryptonite for Kafka might also target applications written with the lower-level Kafka producer and consumer APIs. All the cryptography-related code could even be externalized, such as into a separate C library, so that client applications written in non-JVM languages might directly build upon it without the need to reimplement the same cryptography functions across different languages.
Kryptonite for Kafka strengthens security in Kafka
This two-part series has explained the need to go beyond the usual data-at-rest protection when building data pipelines on top of Apache Kafka. The Kryptonite for Kafka community project provides a configurable CipherField
SMT to perform field-level encryption and decryption for Kafka Connect records on their way into and out of Kafka topics.
Kryptonite for Kafka helps users safeguard their most precious data by keeping secret keys under their own control and performing all cryptography operations on the client side. Kafka brokers never get to see any of the plaintext of sensitive record fields.
The examples in the previous article and this one applied the SMT to achieve client-side, end-to-end encryption and decryption across heterogeneous data sources and sinks by means of configuration only. The examples also illustrated the choice between encrypting complex fields in an opaque manner or one that shows their structure, and how to apply individual secret keys for different payload fields.
Last updated: October 8, 2024