Skip to main content
Redhat Developers  Logo
  • Products

    Featured

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat OpenShift AI
      Red Hat OpenShift AI
    • Red Hat Enterprise Linux AI
      Linux icon inside of a brain
    • Image mode for Red Hat Enterprise Linux
      RHEL image mode
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • Red Hat Developer Hub
      Developer Hub
    • View All Red Hat Products
    • Linux

      • Red Hat Enterprise Linux
      • Image mode for Red Hat Enterprise Linux
      • Red Hat Universal Base Images (UBI)
    • Java runtimes & frameworks

      • JBoss Enterprise Application Platform
      • Red Hat build of OpenJDK
    • Kubernetes

      • Red Hat OpenShift
      • Microsoft Azure Red Hat OpenShift
      • Red Hat OpenShift Virtualization
      • Red Hat OpenShift Lightspeed
    • Integration & App Connectivity

      • Red Hat Build of Apache Camel
      • Red Hat Service Interconnect
      • Red Hat Connectivity Link
    • AI/ML

      • Red Hat OpenShift AI
      • Red Hat Enterprise Linux AI
    • Automation

      • Red Hat Ansible Automation Platform
      • Red Hat Ansible Lightspeed
    • Developer tools

      • Red Hat Trusted Software Supply Chain
      • Podman Desktop
      • Red Hat OpenShift Dev Spaces
    • Developer Sandbox

      Developer Sandbox
      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Secure Development & Architectures

      • Security
      • Secure coding
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
      • View All Technologies
    • Start exploring in the Developer Sandbox for free

      sandbox graphic
      Try Red Hat's products and technologies without setup or configuration.
    • Try at no cost
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • Java
      Java icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • API Catalog
    • Product Documentation
    • Legacy Documentation
    • Red Hat Learning

      Learning image
      Boost your technical skills to expert-level with the help of interactive lessons offered by various Red Hat Learning programs.
    • Explore Red Hat Learning
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

Build a simple cloud-native change data capture pipeline

July 2, 2020
Federico Valeri
Related topics:
JavaEvent-DrivenKubernetes

Share:

    Change data capture (CDC) is a well-established software design pattern for a system that monitors and captures data changes so that other software can respond to those events. Using KafkaConnect, along with Debezium Connectors and the Apache Camel Kafka Connector, we can build a configuration-driven data pipeline to bridge traditional data stores and new event-driven architectures.

    This article walks through a simple example.

    Why use change data capture?

    The advantages of CDC compared to a simple poll-based or query-based process are:

    • All changes are captured: Intermediary changes (i.e., updates and deletes) between two runs of the poll loop might otherwise be missed.
    • Low overhead: Near real-time reaction to data changes avoids increased CPU load due to frequent polling.
    • No data model impact: Timestamp columns are no longer needed to determine the last data update.

    The example

    In this example, we build a simple cloud-native CDC pipeline from scratch. The goal is to send every change from a simple customers table to a message queue for further processing. Once the infrastructure is provisioned, we will implement the data pipeline using configuration files, without writing any code. This is the high-level overview of the architecture:

    MySQL --> KafkaConnect [Worker0JVM(TaskA0, TaskB0, TaskB1),...] --> AMQ
                                    |
                        Kafka (offsets, config, status)
    

    Even if this is a simple use case, it includes the deployment and setup of different integration products on top of OpenShift 4: AMQ 7.7 (message broker), AMQ Streams 1.5 (Kafka and KafkaConnect) and Debezium (CDC engine). This infrastructure will make sense when expanding this simple example into a group of microservices communicating with each other by sending events. The nice thing about using Debezium is that you don't need to change your application's logic.

    You can find all of the required configuration files here.

    Note:  For the sake of simplicity, we use unsecured components that are not suitable for production use. You might want to add TLS encryption setup and also increase resources for any serious use.

    Setting up

    To provision the infrastructure for our CDC pipeline, we have to set up the OpenShift project, MySQL source system, AMQ sink system, AMQ Streams, and the Debezium connector. You can also use Red Hat CodeReady Containers (CRC), but make sure to reserve eight cores and at least 14 GB of memory.

    We will do all of this from the command line, so open your favorite shell. No prompt is added so that you can easily copy and paste the code you find here. First, set your environment variables:

    API_ENDPOINT="https://api.crc.testing:6443"
    ADMIN_NAME="kubeadmin"
    ADMIN_PASS="7z6T5-qmTth-oxaoD-p3xQF"
    USER_NAME="developer"
    USER_PASS="developer"
    PROJECT_NAME="cdc"
    

    Next, create a new project:

    TMP="/tmp/ocp" && rm -rf $TMP && mkdir -p $TMP
    oc login -u $ADMIN_NAME -p $ADMIN_PASS $API_ENDPOINT
    oc new-project $PROJECT_NAME
    oc adm policy add-role-to-user admin $USER_NAME
    

    Then set up Red Hat registry authentication (use your own credentials here):

    REG_SECRET="registry-secret"
    oc create secret docker-registry $REG_SECRET \
        --docker-server="registry.redhat.io" \
        --docker-username="my-user" \
        --docker-password="my-pass"
    oc secrets link default $REG_SECRET --for=pull
    oc secrets link builder $REG_SECRET --for=pull
    oc secrets link deployer $REG_SECRET --for=pull
    

    MySQL setup (source system)

    Our source system is MySQL. Here, we use a custom DeploymentConfig that contains a post lifecycle hook to initialize our database and enable binary log access for the Debezium user. To start, create your ConfigMap and your secret:

    oc create configmap db-config --from-file=./mysql/my.cnf
    oc create configmap db-init --from-file=./mysql/initdb.sql
    oc create secret generic db-creds \
        --from-literal=database-name=cdcdb \
        --from-literal=database-user=cdcadmin \
        --from-literal=database-password=cdcadmin \
        --from-literal=database-admin-password=cdcadmin
    

    Next, deploy your resources:

    oc create -f ./mysql/my-mysql.yaml
    

    Check the status:

    MYSQL_POD=$(oc get pods | grep my-mysql | grep Running | cut -d " " -f1)
    oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e "SELECT * FROM customers"'
    

    Make some data changes:

    oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \
        "INSERT INTO customers (first_name, last_name, email) VALUES (\"John\", \"Doe\", \"jdoe@example.com\")"'
    oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \
        "UPDATE customers SET first_name = \"Jane\" WHERE id = 1"'
    oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \
        "INSERT INTO customers (first_name, last_name, email) VALUES (\"Chuck\", \"Norris\", \"cnorris@example.com\")"'
    

    AMQ Broker setup (sink system)

    Our destination system is the AMQ message broker, and you can download Red Hat AMQ broker from the developer portal for free. Here, we simply create a single instance broker and our destination queue:

    mkdir $TMP/amq
    unzip -qq /path/to/amq-broker-operator-7.7.0-ocp-install-examples.zip -d $TMP/amq
    AMQ_DEPLOY="$(find $TMP/amq -name "deploy" -type d)"
    

    Deploy the Operator:

    oc create -f $AMQ_DEPLOY/service_account.yaml
    oc create -f $AMQ_DEPLOY/role.yaml
    oc create -f $AMQ_DEPLOY/role_binding.yaml
    sed -i -e "s/v2alpha1/v2alpha2/g" $AMQ_DEPLOY/crds/broker_v2alpha1_activemqartemis_crd.yaml
    sed -i -e "s/v2alpha1/v2alpha2/g" $AMQ_DEPLOY/crds/broker_v2alpha1_activemqartemisaddress_crd.yaml
    oc apply -f $AMQ_DEPLOY/crds
    oc secrets link amq-broker-operator $REG_SECRET --for=pull
    oc create -f $AMQ_DEPLOY/operator.yaml
    

    Deploy the resources:

    oc create -f ./amq/my-broker.yaml
    

    Configure it to create the address only when the broker pod is running:

    oc create -f ./amq/my-address.yaml
    

    Check the status:

    oc get activemqartemises
    oc get activemqartemisaddresses
    

    AMQ Streams setup (Kafka)

    You can also download Red Hat AMQ Streams on the same portal page, which is required to run KafkaConnect. Here we create a simple cluster with just one node. There is no need to create any topic here because Debezium will take care of this, creating its internal topics and our destination topic with the pattern serverName.databaseName.tableName.

    mkdir $TMP/streams
    unzip -qq /path/to/amq-streams-1.5.0-ocp-install-examples.zip -d $TMP/streams
    STREAMS_DEPLOY="$(find $TMP/streams -name "install" -type d)"
    

    Deploy the Operator:

    sed -i -e "s/namespace: .*/namespace: $PROJECT_NAME/g" $STREAMS_DEPLOY/cluster-operator/*RoleBinding*.yaml
    oc apply -f $STREAMS_DEPLOY/cluster-operator
    oc set env deploy/strimzi-cluster-operator STRIMZI_NAMESPACE=$PROJECT_NAME
    oc secrets link builder $REG_SECRET --for=pull
    oc secrets link strimzi-cluster-operator $REG_SECRET --for=pull
    oc set env deploy/strimzi-cluster-operator STRIMZI_IMAGE_PULL_SECRETS=$REG_SECRET
    oc apply -f $STREAMS_DEPLOY/strimzi-admin
    oc adm policy add-cluster-role-to-user strimzi-admin $USER_NAME
    

    Deploy the resources:

    oc apply -f ./streams/my-kafka.yaml
    

    Check the status:

    oc get kafkas
    

    AMQ Streams setup (KafkaConnect)

    From the same AMQ Streams package, we can also deploy our KafkaConnect custom image. We will build it on top of the official one, adding our specific connector plugins. In this case, we will add Debezium MySQL Connector and Camel Kafka SJMS2 Connector (I'm using the latest upstream releases for convenience, but you can download Red Hat releases from the portal).

    Note: The nice thing about this new Camel sub-project is that you can use all 300+ components as Kafka connectors, to integrate with almost any external system.

    Once your connectors are up and running (see the status from the describe command), you can make other changes to the customers table and see if they are streamed to the queue by using the broker web console:

    KAFKA_CLUSTER="my-kafka-cluster"
    CONNECTOR_URLS=(
        "https://repo.maven.apache.org/maven2/io/debezium/debezium-connector-mysql/1.1.2.Final/debezium-connector-mysql-1.1.2.Final-plugin.zip"
        "https://repository.apache.org/content/groups/public/org/apache/camel/kafkaconnector/camel-sjms2-kafka-connector/0.3.0/camel-sjms2-kafka-connector-0.3.0-package.zip"
    )
    
    CONNECTORS="$TMP/connectors" && mkdir -p $CONNECTORS
    for url in "${CONNECTOR_URLS[@]}"; do
        curl -sL $url -o $CONNECTORS/file.zip && unzip -qq $CONNECTORS/file.zip -d $CONNECTORS
    done
    sleep 2
    rm -rf $CONNECTORS/file.zip
    

    Deploy the resources:

    oc create secret generic debezium-config --from-file=./streams/connectors/mysql.properties
    oc create secret generic camel-config --from-file=./streams/connectors/amq.properties
    oc apply -f ./streams/my-connect-s2i.yaml
    

    Start the custom image build only when connect cluster is running:

    oc start-build my-connect-cluster-connect --from-dir $CONNECTORS --follow
    

    Check the status:

    oc get kafkaconnects2i
    

    These are all running pods up to this point:

    amq-broker-operator-5d4559677-dpzf5                 1/1     Running     0          21m
    my-broker-ss-0                                      1/1     Running     0          19m
    my-connect-cluster-connect-2-xr58q                  1/1     Running     0          6m28s
    my-kafka-cluster-entity-operator-56c9868474-kfdx2   3/3     Running     1          14m
    my-kafka-cluster-kafka-0                            2/2     Running     0          14m
    my-kafka-cluster-zookeeper-0                        1/1     Running     0          15m
    my-mysql-1-vmbbw                                    1/1     Running     0          25m
    strimzi-cluster-operator-666fcd8b96-q8thc           1/1     Running     0          15m
    

    Now that we have our infrastructure ready, we can finally configure the CDC pipeline:

    oc apply -f ./streams/connectors/mysql-source.yaml
    oc apply -f ./streams/connectors/amq-sink.yaml
    

    Check the status:

    oc get kafkaconnectors
    oc describe kafkaconnector mysql-source
    oc describe kafkaconnector amq-sink
    CONNECT_POD=$(oc get pods | grep my-connect-cluster | grep Running | cut -d " " -f1)
    oc logs $CONNECT_POD
    
    oc get kafkatopics
    oc exec -i $KAFKA_CLUSTER-kafka-0 -c kafka -- bin/kafka-console-consumer.sh \
        --bootstrap-server my-kafka-cluster-kafka-bootstrap:9092 --topic my-mysql.cdcdb.customers --from-beginning
    

    Open the AMQ web console route to check the queue's contents:

    echo http://$(oc get routes my-broker-wconsj-0-svc-rte -o=jsonpath='{.status.ingress[0].host}{"\n"}')/console
    

    Cleanup

    When you have finished experimenting with your new cloud-native CDC pipeline, you can delete the whole project with the following commands and free some resources:

    rm -rf $TMP
    oc delete project $PROJECT_NAME
    oc delete crd/activemqartemises.broker.amq.io
    oc delete crd/activemqartemisaddresses.broker.amq.io
    oc delete crd/activemqartemisscaledowns.broker.amq.io
    oc delete crd -l app=strimzi
    oc delete clusterrolebinding -l app=strimzi
    oc delete clusterrole -l app=strimzi
    

    Considerations

    No matter what technology you use, the change data capture process must run as a single thread to maintain ordering. Since Debezium records the log offset asynchronously, any final consumer of these changes must be idempotent.

    A benefit of running on top of KafkaConnect in distributed mode is that you have a fault-tolerant CDC process. Debezium offers great performance because of the access to the data source's internal transaction log, but there is no standard for it, so a change to the implementation may require a plug-in rewrite. This also means that every data source has its own procedure for enabling access to the transaction log.

    Connectors configuration allows you to transform message payloads by using Single Message Transformations (SMTs). These can be chained and extended with custom implementations, but they are actually designed for simple modifications. Long chains of SMTs are hard to maintain and make sense of. Moreover, transformations are synchronous and applied on each message, so you can slow down the streaming pipeline with heavy processing or external service calls.

    In cases where you need to do heavy processing, split, enrich, aggregate records, or call external services, you should use a stream processing layer between connectors such as Kafka Streams or just plain Camel. Just remember that Kafka Streams creates internal topics and you are forced to put transformed data back into Kafka (data duplication), while this approach is just an option when using Camel.

    Last updated: June 26, 2020

    Recent Posts

    • How to use RHEL 10 as a WSL Podman machine

    • MINC: Fast, local Kubernetes with Podman Desktop & MicroShift

    • How to stay informed with Red Hat status notifications

    • Getting started with RHEL on WSL

    • llm-d: Kubernetes-native distributed inferencing

    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue