Capture Oracle database events in Apache Kafka with Debezium

Capture Oracle database events in Apache Kafka with Debezium

One of the most requested connector plug-ins is coming to Red Hat Integration. You can now stream your data from Oracle databases with the Debezium connector for Oracle in developer preview.

With this new connector, developers can leverage the power of the 100% open source Debezium project to stream their Oracle data to Red Hat AMQ Streams Apache Kafka clusters with Red Hat Integration.

This article gives an overview of the Debezium connector for Oracle. It takes you through the steps to get started streaming your Oracle database, so you can achieve “liberation for your data.”

Note: Red Hat encourages you to use and provide feedback on the features included in developer preview offerings. However, please be advised you should not deploy them in production environments to run business-critical processes, as these releases require further stabilization and testing.

An easier way to access Oracle data

For many organizations, Oracle Database is the heart of their data assets. Over decades, developers have created system after system on top of this central piece of architecture. Almost every developer has faced the requirement to store enterprise information in the Oracle database. Getting the most out of the data stored there becomes critical for every business.

Systems distributed across hybrid clouds must process large volumes of data. The complexity of that data makes it difficult to work with and requires the creation of specialized data pipelines. These days, it’s common for business applications to require access to the latest updates to the data stored in an ORDERS or CUSTOMERS table as soon as they occur. In the past, there were few options to create this type of data pipeline. They included querying the data through Java Database Connectivity (JDBC) and using expensive proprietary technologies to capture those changes.

Streaming your database with the Debezium connector for Oracle

Debezium is a set of distributed services that captures row-level database changes so that applications can see and respond to them. Debezium connectors record all events to a Red Hat AMQ Streams Kafka cluster. Applications use AMQ Streams to consume change events. The Debezium approach to change data capture (CDC) uses the transaction log to avoid dual writes from applications, which might lead to inconsistent data. Debezium is designed to use the database transaction log so that it will emit a record of every change event. By contrast, with a polling-based approach, it’s possible to miss changes between queries.

The connector can monitor and record the row-level changes in databases on Oracle server version 12R2 and later. Debezium ingests change events using Oracle’s native LogMiner database package. Oracle LogMiner is part of the Oracle Database utilities and provides a well-defined, easy-to-use, and comprehensive interface for querying online and archived redo log files.

The first time that the Debezium Oracle connector starts, it performs an initial consistent snapshot of the database to see its entire history. You can change this behavior by setting the snapshot.mode. After the connector completes its initial snapshot, the Debezium connector continues streaming from the position that it read from the current system change number (SCN) position in the redo log. The initial snapshot ensures that the connector has a complete and consistent set of data.

Consider a customer’s table defined in the inventory database schema:

CREATE TABLE customers (

  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,

  first_name VARCHAR2(255) NOT NULL,

  last_name VARCHAR2(255) NOT NULL,

  email VARCHAR2(255) NOT NULL UNIQUE

);

All data change events produced by the connector have a key and a value, although the key and value structure depends on the source table. The event’s key has a schema that contains a field for each column in the primary key (or unique key constraint) of the table. The event’s value contains the row’s ID, FIRST_NAME, LAST_NAME, and EMAIL columns with a representation that depends on the column’s Oracle data type. For deleted rows, an event provides the consumer with information to process the row’s removal.

Setting up your Oracle database

Using a multi-tenancy configuration with a container database, prepare the database by completing the following steps:

  1. Prepare the database and replace the size with your expected value; for example, 10G:
    ORACLE_SID=ORACLCDB dbz_oracle sqlplus /nolog
    
     
    
    CONNECT sys/top_secret AS SYSDBA
    
    alter system set db_recovery_file_dest_size = <REPLACE_WITH_YOUR_SIZE>;
    
    alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    
    shutdown immediate
    
    startup mount
    
    alter database archivelog;
    
    alter database open;
    
    -- Should now "Database log mode: Archive Mode"
    
    archive log list
    
     
    
    exit;
    
    
  2. Enable supplemental logging for captured tables, or for the database so that you can capture the before state of changed database rows:
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    
    
  3. Set up the user account with the specific permissions so that the connector can capture change events:
    sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
    
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
    
        SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    
      exit;
    
     
    
    sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
    
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
    
        SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    
      exit;
    
     
    
    sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
    
     
    
      CREATE USER c##dbzuser IDENTIFIED BY dbz
    
        DEFAULT TABLESPACE logminer_tbs
    
        QUOTA UNLIMITED ON logminer_tbs
    
        CONTAINER=ALL;
    
     
    
      GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
    
      GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
    
      GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
    
      GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
    
      GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
    
     
    
      GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
    
      GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
    
      GRANT ALTER ANY TABLE TO c##dbzuser CONTAINER=ALL;
    
      GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
    
     
    
      GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
    
      GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
    
     
    
      GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
    
     
    
      exit;
    
    

Deploying the Debezium Oracle connector

Currently, Red Hat is not shipping the Oracle JDBC driver, but you can download the Oracle Instant Client to get it.

The following example shows a JSON request for registering an instance of the Debezium Oracle connector:

{

    "name": "inventory-connector",

    "config": {

        "connector.class" : "io.debezium.connector.oracle.OracleConnector",

        "tasks.max" : "1",

        "database.server.name" : "server1",

        "database.user" : "c##dbzuser",

        "database.password" : "dbz",

        "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=OFF)(FAILOVER=ON)(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 1>)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 2>)(PORT=1521)))(CONNECT_DATA=SERVICE_NAME=)(SERVER=DEDICATED)))",

        "database.dbname" : "ORCLCDB",

        "database.pdb.name" : "ORCLPDB1",

        "database.history.kafka.bootstrap.servers" : "kafka:9092",

        "database.history.kafka.topic": "schema-changes.inventory"

    }

}

For more information about the connector properties, see the Debezium documentation.

Get started with Red Hat Integration Debezium connectors for Apache Kafka

Debezium Apache Kafka connectors are available through Red Hat Integration, which offers a comprehensive set of integration and messaging technologies that connect applications and data across hybrid infrastructures. This agile, distributed, containerized, and API-centric product provides service composition and orchestration, application connectivity and data transformation, real-time message streaming, change-data capture, and API management. All combined with a cloud-native platform and toolchain to support the full spectrum of modern application development.

Get started by downloading the Red Hat Integration Debezium CDC connectors from Red Hat Developer.

Share