Configure Attunity Replicate to Perform CDC from Oracle to Kafka

In the last blog, Install Attunity Replicate on Linux, I discussed how to install Attunity Replicate on a Linux VM. In today’s blog, I am going to continue the topic of Attunity Replicate and discuss how to configure Attunity Replicate to perform CDC from an Oracle source database to a Kafka target.

1. Enable Source Database in Archivelog Mode
CDC requires the source database should be in archive log mode. You can easily check it out by running the following command:

SQL> archive log list
Database log mode	       Archive Mode
Automatic archival	       Enabled
Archive destination	       USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     32
Next log sequence to archive   34
Current log sequence	       34

If archivelog mode is not set, do the following:

shutdown immediate;
startup nomount;
alter database mount;
alter database archivelog;
alter database open;

2. Enable Supplemental Logging
Check whether supplemental logging is enable or not. If not, enable it.

SQL> SELECT supplemental_log_data_min FROM v$database;
SUPPLEME
--------
NO

SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
Database altered.

SQL>  SELECT supplemental_log_data_min FROM v$database;
SUPPLEME
--------
YES

3. Create a DB User for CDC and Grant the Permission

CREATE USER cdc_user
  IDENTIFIED BY cdc_user
  DEFAULT TABLESPACE USERS
  TEMPORARY TABLESPACE TEMP
  PROFILE DEFAULT
  ACCOUNT UNLOCK;

4. Grant the permission to the user.

GRANT CREATE SESSION TO CDC_USER;
GRANT SELECT ANY TRANSACTION TO CDC_USER;
GRANT SELECT ON V_$ARCHIVED_LOG TO CDC_USER;
GRANT SELECT ON V_$LOG TO CDC_USER;
GRANT SELECT ON V_$LOGFILE TO CDC_USER;
GRANT SELECT ON V_$DATABASE TO CDC_USER;
GRANT SELECT ON V_$THREAD TO CDC_USER;
GRANT SELECT ON V_$PARAMETER TO CDC_USER;
GRANT SELECT ON V_$NLS_PARAMETERS TO CDC_USER;
GRANT SELECT ON V_$TIMEZONE_NAMES TO CDC_USER;
GRANT SELECT ON V_$TRANSACTION TO CDC_USER;
GRANT SELECT ON ALL_INDEXES TO CDC_USER;
GRANT SELECT ON ALL_OBJECTS TO CDC_USER;
GRANT SELECT ON DBA_OBJECTS TO CDC_USER;
GRANT SELECT ON ALL_TABLES TO CDC_USER;
GRANT SELECT ON ALL_USERS TO CDC_USER;
GRANT SELECT ON ALL_CATALOG TO CDC_USER;
GRANT SELECT ON ALL_CONSTRAINTS TO CDC_USER;
GRANT SELECT ON ALL_CONS_COLUMNS TO CDC_USER;
GRANT SELECT ON ALL_TAB_COLS TO CDC_USER;
GRANT SELECT ON ALL_IND_COLUMNS TO CDC_USER;
GRANT SELECT ON ALL_LOG_GROUPS TO CDC_USER;
GRANT SELECT ON SYS.DBA_REGISTRY TO CDC_USER;
GRANT SELECT ON SYS.OBJ$ TO CDC_USER;
GRANT SELECT ON SYS.ENC$ TO CDC_USER;
GRANT SELECT ON DBA_TABLESPACES TO CDC_USER;
GRANT SELECT ON ALL_TAB_PARTITIONS TO CDC_USER;
GRANT SELECT ON ALL_ENCRYPTED_COLUMNS TO CDC_USER;
GRANT SELECT ON ALL_VIEWS TO CDC_USER;
GRANT SELECT ANY TABLE TO CDC_USER;
GRANT CREATE ANY DIRECTORY TO CDC_USER;
GRANT ALTER ANY TABLE TO CDC_USER;
GRANT SELECT ON V_$TRANSPORTABLE_PLATFORM TO CDC_USER;
GRANT EXECUTE ON DBMS_FILE_TRANSFER TO CDC_USER;

5. Create Test Data

drop user wztest cascade;
create user wztest identified by wztest default tablespace users;
grant connect, resource, unlimited tablespace to wztest;

CREATE TABLE wztest.EMPLOYEE
(
 EMPLOYEE_ID NUMBER (6)  NOT NULL ,
 FIRST_NAME VARCHAR2 (20)  NOT NULL ,
 LAST_NAME VARCHAR2 (20)  NOT NULL ,
 JOIN_DATE date not null,
 ADDRESS_LINE1 varchar2(50),
 ADDRESS_LINE2 varchar2(50),
 ADDRESS_LINE3 varchar2(50),
 POSTCODE varchar2(10),
 PHONE_NUMBER varchar2(30)
) tablespace USERS;

CREATE SEQUENCE employee_id_seq start with 1 increment by 1 cache 20;

begin
    for i in 1..1000 loop
        insert into wztest.employee (employee_id, first_name, last_name, join_date)
        values (employee_id_seq.nextval,
                dbms_random.string('U', 1),
                dbms_random.string('U', dbms_random.value(5,10)),
                to_date(trunc(DBMS_RANDOM.VALUE(to_char(DATE '2000-01-01','J'),to_char(DATE '2016-12-31','J') ) ),'J' ));
    end loop;
    commit;
end;
/
;

select count(*) from wztest.employee;

6. Install Single Node Kafka
For quick demo purpose, I just created a single node Kafka on the same Attunity Replicate Server.

[root@cdc-att-vm1 ~]# mkdir /opt/kafka
[root@cdc-att-vm1 ~]# cd /opt/kafka
[root@cdc-att-vm1 kafka]# wget http://apache.mirrors.lucidnetworks.net/kafka/2.2.0/kafka_2.12-2.2.0.tgz
--2019-05-27 08:38:41--  http://apache.mirrors.lucidnetworks.net/kafka/2.2.0/kafka_2.12-2.2.0.tgz
Resolving apache.mirrors.lucidnetworks.net (apache.mirrors.lucidnetworks.net)... 108.166.161.136
Connecting to apache.mirrors.lucidnetworks.net (apache.mirrors.lucidnetworks.net)|108.166.161.136|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 57028557 (54M) [application/x-gzip]
Saving to: ‘kafka_2.12-2.2.0.tgz’

100%[===============================================================================================================================================>] 57,028,557  1011KB/s   in 55s    

2019-05-27 08:39:36 (1007 KB/s) - ‘kafka_2.12-2.2.0.tgz’ saved [57028557/57028557]

[root@cdc-att-vm1 kafka]# tar -xzf kafka_2.12-2.2.0.tgz
[root@cdc-att-vm1 kafka]# cd kafka_2.12-2.2.0
[root@cdc-att-vm1 kafka_2.12-2.2.0]# ls -l
total 52
drwxr-xr-x. 3 root root  4096 Mar  9 19:46 bin
drwxr-xr-x. 2 root root  4096 Mar  9 19:46 config
drwxr-xr-x. 2 root root  4096 May 27 08:39 libs
-rw-r--r--. 1 root root 32216 Mar  9 19:44 LICENSE
-rw-r--r--. 1 root root   336 Mar  9 19:44 NOTICE
drwxr-xr-x. 2 root root    44 Mar  9 19:46 site-docs

7. Start Kafka
Run the following two commands to start Kafka.

bin/zookeeper-server-start.sh config/zookeeper.properties  &
bin/kafka-server-start.sh config/server.properties &

8. Create a Test Topic
Run the following to create a topic.
bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 3 –topic wztest1
Verify the result.

[root@cdc-att-vm1 kafka_2.12-2.2.0]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
wztest1

9. Add Oracle as Source Endpoint
Now, we can do the real work for configuring Attunity Replicate. The first thing is to configure Attunity Endpoint. I need to configure two endpoints: one for Oracle and one for Kafka. Click Manage Endpoint Connection

Then choose Source as Role and Oracle as the Type. Filling rest of the required information.

Click Advanced tab.
Filling in the asmuser information. The user asmuser should have sysasm privilege.

When done, click Save, then click Test Connection. If parameters are input correctly, you should see the message of Test Connection Succeed.

10. Configure Kafka Endpoint
Click + New Endpoint Connection. This time, choose Kafka as Target. Input the parameters shown below.

Click Save and then Test Connection. It should work.

11. Create a New CDC Task
Make sure you contact with Attunity Sales people to get a license file and import the trail license before creating a new CDC task.
Click + New Task
Give a name and choose the rest default values.

Drag Oracle Source from the panel of Endpoint Connection to the source location. Do the similar thing for Kafka Target to the target location.


Next, I need to add source table. click Table Selection. Add my testing table there.

Then click Save to save task information.

12. Run the Task of Initial Load
Click Run button. It starts the initial load of the tables in the task.

Right now you should see a nice finish of the initial load.

13. Test CDC
Next, let me test whether CDC is working or not. Let me run some DML statements on the testing table.

update wztest.employee set last_name='wztest1' where employee_id=265;
update wztest.employee set last_name='wztest2' where employee_id=268;
update wztest.employee set last_name='wztest3' where employee_id=311;
update wztest.employee set last_name='wztest4' where employee_id=580;
delete from wztest.employee where employee_id=581;
commit;

Now, let me check out CDC monitoring screen. Nice, it shows 4 updates and 1 delete CDC changes.

Nice UI from Attunity.