Featured image: Debezium + Apache Avro + Apicurio

In this article, you will learn how to use Debezium with Apache Avro and Apicurio Registry to efficiently monitor change events in a MySQL database. We will set up and run a demonstration using Apache Avro rather than the default JSON converter for Debezium serialization. We will use Apache Avro with the Apicurio service registry to externalize Debezium’s event data schema and reduce the payload of captured events.

What is Debezium?

Debezium is a set of distributed services that captures row-level database changes so that applications can view and respond to them. Debezium connectors record all events to a Red Hat AMQ Streams Kafka cluster. Applications use AMQ Streams to consume change events.

Debezium uses the Apache Kafka Connect framework, which transforms Debezium’s connectors into Kafka Connector source connectors. They can be deployed and managed using Kafka Connect custom Kubernetes resources provided by AMQ Streams.

Debezium supports the following database connectors:

  • MySQL connector
  • PostgreSQL connector
  • MongoDB connector
  • SQL Server connector

We’ll use the MySQL connector for our example.

Debezium serialization with Apache Avro

Debezium uses a JSON converter to serialize record keys and values into JSON documents. By default, the JSON converter includes a record’s message schema, so each record is quite verbose. Another option is to use Apache Avro to serialize and deserialize each record’s keys and values. If you want to use Apache Avro for serialization, you must also deploy a schema registry, which manages Avro’s message schemas and their versions.

Apicurio Registry is an open source project that works with Avro. It provides an Avro converter along with an API and schema registry. You can specify the Avro converter in your Debezium connector configuration. The converter then maps Kafka Connect schemas to Avro schemas. It uses the Avro schemas to serialize record keys and values into Avro’s compact binary form.

The Apicurio API and schema registry track the Avro schemas used in Kafka topics. Apicurio also tracks where the Avro converter sends the generated Avro schemas.

Note: The Apicurio service registry is fully supported and generally available as part of Red Hat Integration.

Demonstration: Debezium serialization with Apache Avro and Apicurio

For this demonstration, we will run an example application that uses Avro for serialization and the Apicurio service registry to track Debezium events. To successfully run the example application, you will need the following tools installed and running in your development environment:

  • The most recent version of Docker: Needs to be installed with Linux container images .
  • kafkacat : A generic non-JVM producer and consumer for Kafka.
  • jq : A command-line utility for JSON processing.

Step 1: Start the services

Once you have the required tools installed in your development environment, we can begin the demonstration by cloning the debezium-examples repository and starting the required service components:

  1. Clone the repository:
    $ git clone https://github.com/hguerrero/debezium-examples.git
  2. Change to the following directory:
    $ cd debezium-examples/debezium-registry-avro
  3. Start the environment:
    $ docker-compose up -d

The last command starts the following components:

  • A single-node Zookeeper and Kafka cluster
  • A single-node Kafka Connect cluster
  • An Apicurio service registry instance
  • A MySQL database that is ready for change data capture

Step 2: Configure the Debezium connector

Next, we configure the Debezium connector. The configuration file instructs the Debezium connector to use Avro for serialization and deserialization. It also specifies the location of the Apicurio registry.

The container image used in this environment includes all of the required libraries to access the connectors and converters. The following lines set the key and value converters and their respective registry configurations:

        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter.apicurio.registry.url": "http://registry:8080/api",
        "key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "kwy.converter.apicurio.registry.as-confluent": "true",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://registry:8080/api",
        "value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "value.converter.apicurio.registry.as-confluent": "true"

Note that Apicurio’s compatibility mode lets us use tooling from another provider to deserialize and reuse the Apicurio service registry’s schemas.

Step 3: Create the connector

Next, we create the Debezium connector to start capturing changes in the database. We’ll use the Kafka Connect cluster REST API to create the Debezium connector:

$ curl -X POST http://localhost:8083/connectors -H 'content-type:application/json' -d @dbz-mysql-connector-avro.json

Step 4: Check the data

We have created and started the connector. Now, we notice that the initial data in the database has been captured by Debezium and sent to Kafka as events. Use the following kafkacat command to review the data:

$ kafkacat -b localhost:9092 -t avro.inventory.customers -e

Step 5: Deserialize the record

When you review the data, you will notice that the information returned is not human-readable. That means Avro correctly serialized it. To get a readable version of the data, we can instruct kafkacat to query the schema from the Apicurio service registry and use it to deserialize the records. Run the following command with the registry configuration:

$ kafkacat -b localhost:9092 -t avro.inventory.customers -s avro -r http://localhost:8081/api/ccompat -e

If you have the jq JSON utility installed, you can use the following command instead:

$ kafkacat -b localhost:9092 -t avro.inventory.customers -s avro -r http://localhost:8081/api/ccompat -e | jq

You can see that the Kafka record information contains only the payload without the Debezium schema’s overhead. That overhead is now externalized in the Apicurio registry.

Step 6: Check the Apicurio schema registry

Finally, if you want to view a list of all of the schema artifacts for this example, you can check the Apicurio schema registry at http://localhost:8081/, shown in Figure 1.

The Apicurio registry lists six schema artifacts for this example.

Figure 1: Schema artifacts in the Apicurio schema registry.

Get started with Debezium and Kafka Connect

You can download the Red Hat Integration Debezium connectors from the Red Hat Developer portal. You can also check out Gunnar Morling’s webinar on Debezium and Kafka (February 2019) from the DevNation Tech Talks series. Also, see his more recent Kafka and Debezium presentation at QCon (January 2020).

Conclusion

Although Debezium makes it easy to capture database changes and record them in Kafka, developers still have to decide how to serialize the change events in Kafka. Debezium lets you specify the key and value converters from various options. Using an Avro connector with the Apicurio service registry, you can store externalized versions of the schema and minimize the payload you have to propagate.

Debezium Apache Kafka connectors are available from Red Hat Integration. Red Hat Integration is a comprehensive set of integration and messaging technologies that connect applications and data across hybrid infrastructures.

Last updated: March 18, 2024