Build a simple cloud-native change data capture pipeline

Change data capture (CDC) is a well-established software design pattern for a system that monitors and captures data changes so that other software can respond to those events. Using KafkaConnect, along with Debezium Connectors and the Apache Camel Kafka Connector, we can build a configuration-driven data pipeline to bridge traditional data stores and new event-driven architectures.

This article walks through a simple example.

Why use change data capture?

The advantages of CDC compared to a simple poll-based or query-based process are:

  • All changes are captured: Intermediary changes (i.e., updates and deletes) between two runs of the poll loop might otherwise be missed.
  • Low overhead: Near real-time reaction to data changes avoids increased CPU load due to frequent polling.
  • No data model impact: Timestamp columns are no longer needed to determine the last data update.

The example

In this example, we build a simple cloud-native CDC pipeline from scratch. The goal is to send every change from a simple customers table to a message queue for further processing. Once the infrastructure is provisioned, we will implement the data pipeline using configuration files, without writing any code. This is the high-level overview of the architecture:

MySQL --> KafkaConnect [Worker0JVM(TaskA0, TaskB0, TaskB1),...] --> AMQ
                                |
                    Kafka (offsets, config, status)

Even if this is a simple use case, it includes the deployment and setup of different integration products on top of OpenShift 4: AMQ 7.7 (message broker), AMQ Streams 1.5 (Kafka and KafkaConnect) and Debezium (CDC engine). This infrastructure will make sense when expanding this simple example into a group of microservices communicating with each other by sending events. The nice thing about using Debezium is that you don't need to change your application's logic.

You can find all of the required configuration files here.

Note:  For the sake of simplicity, we use unsecured components that are not suitable for production use. You might want to add TLS encryption setup and also increase resources for any serious use.

Setting up

To provision the infrastructure for our CDC pipeline, we have to set up the OpenShift project, MySQL source system, AMQ sink system, AMQ Streams, and the Debezium connector. You can also use Red Hat CodeReady Containers (CRC), but make sure to reserve eight cores and at least 14 GB of memory.

We will do all of this from the command line, so open your favorite shell. No prompt is added so that you can easily copy and paste the code you find here. First, set your environment variables:

API_ENDPOINT="https://api.crc.testing:6443"
ADMIN_NAME="kubeadmin"
ADMIN_PASS="7z6T5-qmTth-oxaoD-p3xQF"
USER_NAME="developer"
USER_PASS="developer"
PROJECT_NAME="cdc"

Next, create a new project:

TMP="/tmp/ocp" && rm -rf $TMP && mkdir -p $TMP
oc login -u $ADMIN_NAME -p $ADMIN_PASS $API_ENDPOINT
oc new-project $PROJECT_NAME
oc adm policy add-role-to-user admin $USER_NAME

Then set up Red Hat registry authentication (use your own credentials here):

REG_SECRET="registry-secret"
oc create secret docker-registry $REG_SECRET \
    --docker-server="registry.redhat.io" \
    --docker-username="my-user" \
    --docker-password="my-pass"
oc secrets link default $REG_SECRET --for=pull
oc secrets link builder $REG_SECRET --for=pull
oc secrets link deployer $REG_SECRET --for=pull

MySQL setup (source system)

Our source system is MySQL. Here, we use a custom DeploymentConfig that contains a post lifecycle hook to initialize our database and enable binary log access for the Debezium user. To start, create your ConfigMap and your secret:

oc create configmap db-config --from-file=./mysql/my.cnf
oc create configmap db-init --from-file=./mysql/initdb.sql
oc create secret generic db-creds \
    --from-literal=database-name=cdcdb \
    --from-literal=database-user=cdcadmin \
    --from-literal=database-password=cdcadmin \
    --from-literal=database-admin-password=cdcadmin

Next, deploy your resources:

oc create -f ./mysql/my-mysql.yaml

Check the status:

MYSQL_POD=$(oc get pods | grep my-mysql | grep Running | cut -d " " -f1)
oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e "SELECT * FROM customers"'

Make some data changes:

oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \
    "INSERT INTO customers (first_name, last_name, email) VALUES (\"John\", \"Doe\", \"jdoe@example.com\")"'
oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \
    "UPDATE customers SET first_name = \"Jane\" WHERE id = 1"'
oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \
    "INSERT INTO customers (first_name, last_name, email) VALUES (\"Chuck\", \"Norris\", \"cnorris@example.com\")"'

AMQ Broker setup (sink system)

Our destination system is the AMQ message broker, and you can download Red Hat AMQ broker from the developer portal for free. Here, we simply create a single instance broker and our destination queue:

mkdir $TMP/amq
unzip -qq /path/to/amq-broker-operator-7.7.0-ocp-install-examples.zip -d $TMP/amq
AMQ_DEPLOY="$(find $TMP/amq -name "deploy" -type d)"

Deploy the Operator:

oc create -f $AMQ_DEPLOY/service_account.yaml
oc create -f $AMQ_DEPLOY/role.yaml
oc create -f $AMQ_DEPLOY/role_binding.yaml
sed -i -e "s/v2alpha1/v2alpha2/g" $AMQ_DEPLOY/crds/broker_v2alpha1_activemqartemis_crd.yaml
sed -i -e "s/v2alpha1/v2alpha2/g" $AMQ_DEPLOY/crds/broker_v2alpha1_activemqartemisaddress_crd.yaml
oc apply -f $AMQ_DEPLOY/crds
oc secrets link amq-broker-operator $REG_SECRET --for=pull
oc create -f $AMQ_DEPLOY/operator.yaml

Deploy the resources:

oc create -f ./amq/my-broker.yaml

Configure it to create the address only when the broker pod is running:

oc create -f ./amq/my-address.yaml

Check the status:

oc get activemqartemises
oc get activemqartemisaddresses

AMQ Streams setup (Kafka)

You can also download Red Hat AMQ Streams on the same portal page, which is required to run KafkaConnect. Here we create a simple cluster with just one node. There is no need to create any topic here because Debezium will take care of this, creating its internal topics and our destination topic with the pattern serverName.databaseName.tableName.

mkdir $TMP/streams
unzip -qq /path/to/amq-streams-1.5.0-ocp-install-examples.zip -d $TMP/streams
STREAMS_DEPLOY="$(find $TMP/streams -name "install" -type d)"

Deploy the Operator:

sed -i -e "s/namespace: .*/namespace: $PROJECT_NAME/g" $STREAMS_DEPLOY/cluster-operator/*RoleBinding*.yaml
oc apply -f $STREAMS_DEPLOY/cluster-operator
oc set env deploy/strimzi-cluster-operator STRIMZI_NAMESPACE=$PROJECT_NAME
oc secrets link builder $REG_SECRET --for=pull
oc secrets link strimzi-cluster-operator $REG_SECRET --for=pull
oc set env deploy/strimzi-cluster-operator STRIMZI_IMAGE_PULL_SECRETS=$REG_SECRET
oc apply -f $STREAMS_DEPLOY/strimzi-admin
oc adm policy add-cluster-role-to-user strimzi-admin $USER_NAME

Deploy the resources:

oc apply -f ./streams/my-kafka.yaml

Check the status:

oc get kafkas

AMQ Streams setup (KafkaConnect)

From the same AMQ Streams package, we can also deploy our KafkaConnect custom image. We will build it on top of the official one, adding our specific connector plugins. In this case, we will add Debezium MySQL Connector and Camel Kafka SJMS2 Connector (I'm using the latest upstream releases for convenience, but you can download Red Hat releases from the portal).

Note: The nice thing about this new Camel sub-project is that you can use all 300+ components as Kafka connectors, to integrate with almost any external system.

Once your connectors are up and running (see the status from the describe command), you can make other changes to the customers table and see if they are streamed to the queue by using the broker web console:

KAFKA_CLUSTER="my-kafka-cluster"
CONNECTOR_URLS=(
    "https://repo.maven.apache.org/maven2/io/debezium/debezium-connector-mysql/1.1.2.Final/debezium-connector-mysql-1.1.2.Final-plugin.zip"
    "https://repository.apache.org/content/groups/public/org/apache/camel/kafkaconnector/camel-sjms2-kafka-connector/0.3.0/camel-sjms2-kafka-connector-0.3.0-package.zip"
)

CONNECTORS="$TMP/connectors" && mkdir -p $CONNECTORS
for url in "${CONNECTOR_URLS[@]}"; do
    curl -sL $url -o $CONNECTORS/file.zip && unzip -qq $CONNECTORS/file.zip -d $CONNECTORS
done
sleep 2
rm -rf $CONNECTORS/file.zip

Deploy the resources:

oc create secret generic debezium-config --from-file=./streams/connectors/mysql.properties
oc create secret generic camel-config --from-file=./streams/connectors/amq.properties
oc apply -f ./streams/my-connect-s2i.yaml

Start the custom image build only when connect cluster is running:

oc start-build my-connect-cluster-connect --from-dir $CONNECTORS --follow

Check the status:

oc get kafkaconnects2i

These are all running pods up to this point:

amq-broker-operator-5d4559677-dpzf5                 1/1     Running     0          21m
my-broker-ss-0                                      1/1     Running     0          19m
my-connect-cluster-connect-2-xr58q                  1/1     Running     0          6m28s
my-kafka-cluster-entity-operator-56c9868474-kfdx2   3/3     Running     1          14m
my-kafka-cluster-kafka-0                            2/2     Running     0          14m
my-kafka-cluster-zookeeper-0                        1/1     Running     0          15m
my-mysql-1-vmbbw                                    1/1     Running     0          25m
strimzi-cluster-operator-666fcd8b96-q8thc           1/1     Running     0          15m

Now that we have our infrastructure ready, we can finally configure the CDC pipeline:

oc apply -f ./streams/connectors/mysql-source.yaml
oc apply -f ./streams/connectors/amq-sink.yaml

Check the status:

oc get kafkaconnectors
oc describe kafkaconnector mysql-source
oc describe kafkaconnector amq-sink
CONNECT_POD=$(oc get pods | grep my-connect-cluster | grep Running | cut -d " " -f1)
oc logs $CONNECT_POD

oc get kafkatopics
oc exec -i $KAFKA_CLUSTER-kafka-0 -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server my-kafka-cluster-kafka-bootstrap:9092 --topic my-mysql.cdcdb.customers --from-beginning

Open the AMQ web console route to check the queue's contents:

echo http://$(oc get routes my-broker-wconsj-0-svc-rte -o=jsonpath='{.status.ingress[0].host}{"\n"}')/console

Cleanup

When you have finished experimenting with your new cloud-native CDC pipeline, you can delete the whole project with the following commands and free some resources:

rm -rf $TMP
oc delete project $PROJECT_NAME
oc delete crd/activemqartemises.broker.amq.io
oc delete crd/activemqartemisaddresses.broker.amq.io
oc delete crd/activemqartemisscaledowns.broker.amq.io
oc delete crd -l app=strimzi
oc delete clusterrolebinding -l app=strimzi
oc delete clusterrole -l app=strimzi

Considerations

No matter what technology you use, the change data capture process must run as a single thread to maintain ordering. Since Debezium records the log offset asynchronously, any final consumer of these changes must be idempotent.

A benefit of running on top of KafkaConnect in distributed mode is that you have a fault-tolerant CDC process. Debezium offers great performance because of the access to the data source's internal transaction log, but there is no standard for it, so a change to the implementation may require a plug-in rewrite. This also means that every data source has its own procedure for enabling access to the transaction log.

Connectors configuration allows you to transform message payloads by using Single Message Transformations (SMTs). These can be chained and extended with custom implementations, but they are actually designed for simple modifications. Long chains of SMTs are hard to maintain and make sense of. Moreover, transformations are synchronous and applied on each message, so you can slow down the streaming pipeline with heavy processing or external service calls.

In cases where you need to do heavy processing, split, enrich, aggregate records, or call external services, you should use a stream processing layer between connectors such as Kafka Streams or just plain Camel. Just remember that Kafka Streams creates internal topics and you are forced to put transformed data back into Kafka (data duplication), while this approach is just an option when using Camel.

Last updated: June 26, 2020