Featured image for Kafka topics.

Debezium connectors are easily deployable on Red Hat OpenShift as Kafka Connect custom resources managed by Red Hat AMQ Streams. However, in the past, developers had to create their own images to deploy using those custom resources. The Red Hat Integration 2021.Q4 release provides an easier way to support the process.

This article shows you how to configure the resource to improve your container build process and describes the new features for the Debezium component as part of the latest release.

What is Kafka Connect?

Kafka Connect is a streaming data tool that lets you integrate Apache Kafka with external databases, storage, and messaging systems. It provides a framework for moving large amounts of data into and out of Kafka clusters while maintaining scalability and reliability. You can also use Kafka Connect to build connector plug-ins for your Kafka cluster and run connectors.

What is Debezium?

Debezium is a set of distributed services that capture changes in your databases. Your applications can consume and respond to those changes. Debezium captures each row-level change in each database table in a change event record and streams these records to Apache Kafka topics. Applications read these streams, which provide the change event records in the same order in which they were generated.

Change data capture

Change data capture is handy for situations such as:

  • Data replication to other databases in order to feed data to other teams, or as streams for analytics, data lakes, or data warehouses.
  • Data exchange between microservices to propagate the data between different services without coupling, for keeping optimized views locally, or for monolith-to-microservices evolutions.
  • Auditing.
  • Cache invalidation.
  • Indexing for full-text search.
  • Updating read models for Command and Query Responsibility Segregation (CQRS).

And much more. Debezium provides connectors to monitor the following databases:

  • MySQL
  • PostgreSQL
  • MongoDB
  • SQL Server
  • Db2
  • Oracle

For an overview of new features in the latest release, see What’s new in Debezium 1.6.

Options for building the Kafka Connect image

The latest AMQ release deprecates the former option of using Kafka Connect with Source-to-Image (S2I) to create a Debezium image. With the introduction of the build configuration to the KafkaConnect resource, AMQ Streams can now automatically build a container image with the connector plugins required for your data connections. If you use OpenShift, the image is constructed by OpenShift builds.

If you need to, you can instead manually create your container image based on the AMQ Streams image for Apache Kafka. Creating a container image manually gives you complete control and flexibility during the build.

However, when the requirement is to add some plugins to the base image, the new build process from AMQ streams makes it easier for you to bootstrap an application. AMQ Streams will spin up a build pod that builds the image based on the configuration from the custom resource. The final image is then pushed into a specific container registry or image stream. The Kafka Connect cluster specified by the custom resource with the build configuration part uses the newly built image.

Build a Debezium Kafka Connect image with a custom resource

Now that you have some background on how the new AMQ Streams build mechanism works, let's go through an example of how to create a Kafka Connect cluster with the Debezium connector for MySQL.

First, you have to have an AMQ Streams operator installed and a Kafka cluster up and running.

Because we are deploying our Kafka Connect cluster on OpenShift, we will be using the imagestream build type. For that build type, you need to create the ImageStream to be used by the build:

apiVersion: image.openshift.io/v1
kind: ImageStream
metadata:
  name: kafka-connect-dbz-mysql
spec:
  lookupPolicy:
    local: false

Now create a KafkaConnect resource with a configuration similar to the following:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-mysql-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  bootstrapServers: 'my-cluster-kafka-bootstrap:9092'
  build:
    output:
      type: imagestream
      image: kafka-connect-dbz-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: zip
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.0.Final/debezium-connector-mysql-1.6.0.Final-plugin.zip
  config:
    status.storage.topic: debezium-mysql-cluster-status
    status.storage.replication.factor: 3
    offset.storage.topic: debezium-mysql-cluster-offsets
    value.converter: org.apache.kafka.connect.json.JsonConverter
    group.id: debezium-mysql-cluster
    value.converter.schemas.enable: true
    config.storage.replication.factor: 3
    config.storage.topic: debezium-mysql-cluster-configs
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    offset.storage.replication.factor: 3
    replicas: 1
  version: 2.8.0

You can change the URL of the artifact to be downloaded, depending on the source for your connector. You can add more sources if you plan to reuse the cluster for other connectors. Also, configure the sha512sum property to validate the downloaded file.

AMQ Streams will spin up a Kafka Connect build pod, which creates a regular Kafka Connect pod. The build takes some minutes. In the end, you will see that the custom resource is updated with the information about the included connectors:

...
status:
    conditions:
        - lastTransitionTime: '2021-11-24T19:21:49.049109Z'
        status: 'True'
    type: Ready
    connectorPlugins:
        - class: io.debezium.connector.mysql.MySqlConnector
            type: source
            version: 1.6.0.Final
        - class: org.apache.kafka.connect.file.FileStreamSinkConnector
            type: sink
            version: 2.8.0.redhat-00002
        - class: org.apache.kafka.connect.file.FileStreamSourceConnector
            type: source
            version: 2.8.0.redhat-00002
        - class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
            type: source
            version: '1'
        - class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
            type: source
            version: '1'
        - class: org.apache.kafka.connect.mirror.MirrorSourceConnector
            type: source
            version: '1'
    labelSelector: >-
        strimzi.io/cluster=debezium-mysql-connect-cluster,strimzi.io/name=debezium-mysql-connect-cluster-connect,strimzi.io/kind=KafkaConnect
    observedGeneration: 1
    replicas: 1
    url: 'http://debezium-mysql-connect-cluster-connect-api.debezium.svc:8083'

You can now use this cluster to configure your Debezium connector with your data access configuration and the tables to start capturing.

In the example configuration, we enabled the AMQ Streams managed connectors. Therefore, you will need to create a KafkaConnector custom resource to create or update your connector. If you remove the annotation, you can use the included REST API or some cuddly command-line tool such as kcctl for interacting with the cluster.

What’s new in Debezium 1.6

The 1.6 release offers great news in the areas of Oracle integration, incremental snapshotting, security, and—the topic of this article—a new build mechanism for AMQ Streams.

The highly awaited Oracle connector keeps moving forward and has reached the Technical Preview (TP) stage. This release moves it closer to GA thanks to the feedback provided by all the users of the previous versions. With this connector, developers can exploit the power of Debezium to distribute data stored in Oracle databases. You should find fewer issues and improved documentation in Red Hat's customer portal.

The next significant improvement is support for incremental snapshotting. This feature addresses several requirements around snapshotting that came up repeatedly:

  • Resume an ongoing snapshot after a connector restart.
  • Resnapshot selected tables during streaming, e.g., to re-bootstrap Kafka topics with change events for specific tables.
  • Snapshot any tables newly added to the list of captured tables after changing the filter configuration.
  • Begin to stream changes while an initial snapshot is running.

Incremental snapshotting in Debezium is available in the form of ad-hoc snapshots. The user does not configure the connector to execute the snapshot but instead uses the signaling mechanism to send a snapshot signal and thus trigger a snapshot of a set of tables. You can read more details about the implementation in Jiri Pechanec’s excellent blog post Incremental Snapshots in Debezium.

In an earlier article, How to secure Apache Kafka schemas with Red Hat Integration Service Registry 2.0, I described the Service Registry 2.0 release for Red Hat Integration improvements. Debezium is now tested with the new version of the registry. You can now feel confident to upgrade your version to get all the latest features.

You can find updated documentation on builds in Red Hat’s customer portal, and the Debezium artifacts will be available in the Red Hat Maven repository for effortless access.

Summary

This article explained how to build a container image using the AMQ Streams custom resource for your Debezium connector. The build process is part of the latest Debezium 1.6 release.

Don’t forget to check the documentation and the Red Hat Developer program site for more on Debezium, Apache Kafka Connectors, and Red Hat Integration.

Last updated: December 8, 2021

Comments