Use incremental import in sqoop to load data from Oracle (Part I)

This is a two parts series in discussing incremental import job in sqoop. This post is the first part of the series.

In the last few posts, I discussed the following:
1. Install Cloudera Hadoop Cluster using Cloudera Manager
2. Configurations after CDH Installation
3. Load Data to Hive Table.
4. Import Data to Hive from Oracle Database
5. Export data from Hive table to Oracle Database.
6. Use Impala to query a Hive table
When using sqoop to load data to hive table from an Oracle table, it’s not always loading a full table to hive in one shot, just like taking many days’ work to build the house below. In other words, it is common to load partial data from Oracle table to an existing Hive table. This is where we need to use sqoop incremental job to do the work.

sqoop_incremental_1

First, I create a simple table to illustrate the process for incremental import.
1. Create the source table.
Run the following query to create a new table on Oracle database.

<b>create table wzhou.student
(
student_id number(8) not null,
student_name varchar2(20) not null,
major varchar2(20),
CONSTRAINT student_pk PRIMARY KEY (student_id)
);
insert into wzhou.student values ( 1, 'student1', 'math' );
insert into wzhou.student values ( 2, 'student2', 'computer' );
insert into wzhou.student values ( 3, 'student3', 'math' );
insert into wzhou.student values ( 4, 'student4', 'accounting' );

commit;
select * from wzhou.student;</b>

2. Create the import command.

sqoop import \
--connect jdbc:oracle:thin:@enkx3-scan:1521:dbm2 \
--username wzhou \
--password wzhou \
--table STUDENT \
--incremental append \
--check-column student_id \
-m 4 \
--split-by major

check-column argument specifies which column to be checked during the import operation. The column can not be *CHAR type, like VARCHAR2 or CHAR.

incremental argument can have two modes: append and lastmodified. Lasmodified argument is usually used with a lastmodified column defined as timestamp.
Last-value argument is used to specify a value that new rows with greater than this value will be insert.
or use other ways, suggested by one internet article:
–last-value $($HIVE_HOME/bin/hive -e “select max(idcolumn) from tablename”)

Note 1:
If seeing the following error, make sure the tablename specified in –table argument in UPPERCASE. If it is lowercase, you could see the error below.
ERROR tool.ImportTool: Imported Failed: There is no column found in the target table all_objects_inc_test. Please ensure that your table name is correct.

Note 2:
If using –hive-import argument, you could see the following error. It is not supported yet. So have to remove it and build Hive External table after complete the import data to hdfs.
ERROR Append mode for hive imports is not yet supported. Please remove the parameter –append-mode.

3. Execute the Table Import to Hive.
[wzhou@vmhost1 ~]$ sqoop import \
> –connect jdbc:oracle:thin:@enkx3-scan:1521:dbm2 \
> –username wzhou \
> –password wzhou \
> –table STUDENT \
> –incremental append \
> –check-column student_id \
> -m 4 \
> –split-by major

Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/25 05:14:22 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
15/09/25 05:14:22 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
. . . .
15/09/25 05:17:33 INFO mapreduce.ImportJobBase: Transferred 74 bytes in 184.558 seconds (0.401 bytes/sec)
15/09/25 05:17:33 INFO mapreduce.ImportJobBase: Retrieved 4 records.
15/09/25 05:17:33 INFO util.AppendUtils: Creating missing output directory – STUDENT
15/09/25 05:17:33 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
15/09/25 05:17:33 INFO tool.ImportTool: –incremental append
15/09/25 05:17:33 INFO tool.ImportTool: –check-column student_id
15/09/25 05:17:33 INFO tool.ImportTool: –last-value 4
15/09/25 05:17:33 INFO tool.ImportTool: (Consider saving this with ‘sqoop job –create’)

Notice there is a line of –last-value 4 at the end of execution. This is correct as I imported only 4 rows. The result file is under /user/wzhou/STUDENT. Let’s verify it.

[wzhou@vmhost1 ~]$ hdfs dfs -ls /user/wzhou/STUDENT
Found 5 items
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:14 /user/wzhou/STUDENT/part-m-00000
-rw-r--r--   2 wzhou bigdata         42 2015-09-25 05:15 /user/wzhou/STUDENT/part-m-00001
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:16 /user/wzhou/STUDENT/part-m-00002
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:17 /user/wzhou/STUDENT/part-m-00003
-rw-r--r--   2 wzhou bigdata         32 2015-09-25 05:17 /user/wzhou/STUDENT/part-m-00004
[wzhou@vmhost1 ~]$ hdfs dfs -cat /user/wzhou/STUDENT/part*
2,student2,computer
4,student4,accounting
1,student1,math
3,student3,math
[wzhou@vmhost1 ~]$ hdfs dfs -cat /user/wzhou/STUDENT/part*1
2,student2,computer
4,student4,accounting
[wzhou@vmhost1 ~]$ hdfs dfs -cat /user/wzhou/STUDENT/part*4
1,student1,math
3,student3,math

Result looks good so far.

4. Create Hive external table. The new hive external table on Hadoop is still under test_oracle database.
hive
USE test_oracle;
CREATE EXTERNAL TABLE student_ext (
student_id string,
student_name string,
major string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’
LOCATION ‘/user/wzhou/STUDENT’;

select * from student_ext;

Here are the result.
[wzhou@vmhost1 ~]$ hive
hive> USE test_oracle;
OK
Time taken: 1.547 seconds
hive> CREATE EXTERNAL TABLE student_ext (
> student_id string,
> student_name string,
> major string
> )
> ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
> LINES TERMINATED BY ‘\n’
> LOCATION ‘/user/wzhou/STUDENT’;

OK
Time taken: 1.266 seconds
hive> select * from student_ext;
OK
2 student2 computer
4 student4 accounting
1 student1 math
3 student3 math
Time taken: 0.679 seconds, Fetched: 4 row(s)

5. 2nd Round of Insert.
Let me to test out more insert to see how incremental import works. On oracle database, create a few more rows.

insert into wzhou.student values ( 5, ‘student3’, ‘computer’);
insert into wzhou.student values ( 6, ‘student4’, ‘math’);
insert into wzhou.student values ( 7, ‘student5’, ‘computer’ );
commit;

6. Rerun the incremental import command.
[wzhou@vmhost1 ~]$ sqoop import \
> –connect jdbc:oracle:thin:@enkx3-scan:1521:dbm2 \
> –username wzhou \
> –password wzhou \
> –table STUDENT \
> –incremental append \
> –check-column student_id \
> -m 4 \
> –split-by major

Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/25 05:39:50 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
15/09/25 05:39:51 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
15/09/25 05:39:51 INFO oracle.OraOopManagerFactory: Data Connector for Oracle and Hadoop is disabled.
. . . .
15/09/25 05:42:54 INFO mapreduce.ImportJobBase: Transferred 130 bytes in 178.3882 seconds (0.7287 bytes/sec)
15/09/25 05:42:54 INFO mapreduce.ImportJobBase: Retrieved 7 records.
15/09/25 05:42:54 INFO util.AppendUtils: Appending to directory STUDENT
15/09/25 05:42:54 INFO util.AppendUtils: Using found partition 5
15/09/25 05:42:54 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
15/09/25 05:42:54 INFO tool.ImportTool: –incremental append
15/09/25 05:42:54 INFO tool.ImportTool: –check-column student_id
15/09/25 05:42:54 INFO tool.ImportTool: –last-value 7
15/09/25 05:42:54 INFO tool.ImportTool: (Consider saving this with ‘sqoop job –create’)

Interesting, the result is not what I expect. I expected only three rows will be inserted. But it seem all 7 rows are inserted. Let me verify the result.

[wzhou@vmhost1 ~]$ hdfs dfs -ls /user/wzhou/STUDENT
Found 10 items
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:14 /user/wzhou/STUDENT/part-m-00000
-rw-r--r--   2 wzhou bigdata         42 2015-09-25 05:15 /user/wzhou/STUDENT/part-m-00001
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:16 /user/wzhou/STUDENT/part-m-00002
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:17 /user/wzhou/STUDENT/part-m-00003
-rw-r--r--   2 wzhou bigdata         32 2015-09-25 05:17 /user/wzhou/STUDENT/part-m-00004
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:40 /user/wzhou/STUDENT/part-m-00005
-rw-r--r--   2 wzhou bigdata         82 2015-09-25 05:41 /user/wzhou/STUDENT/part-m-00006
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:41 /user/wzhou/STUDENT/part-m-00007
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:42 /user/wzhou/STUDENT/part-m-00008
-rw-r--r--   2 wzhou bigdata         48 2015-09-25 05:42 /user/wzhou/STUDENT/part-m-00009

[wzhou@vmhost1 ~]$ hdfs dfs -cat /user/wzhou/STUDENT/part*6
2,student2,computer
4,student4,accounting
5,student3,computer
7,student5,computer

[wzhou@vmhost1 ~]$ hdfs dfs -cat /user/wzhou/STUDENT/part*9
1,student1,math
3,student3,math
6,student4,math

Hive result. 
hive> select * from student_ext;
OK
2	student2	computer
4	student4	accounting
1	student1	math
3	student3	math
2	student2	computer
4	student4	accounting
5	student3	computer
7	student5	computer
1	student1	math
3	student3	math
6	student4	math
Time taken: 0.085 seconds, Fetched: 11 row(s)

Ok, let me look at the result from Impala.

[vmhost3:21000] > select count(*) from student_ext;
Query: select count(*) from student_ext
+----------+
| count(*) |
+----------+
| 4        |
+----------+
Fetched 1 row(s) in 0.85s

Interesting. Impala shows 4 rows instead of 11 rows. The reason is that impala does not refresh metadata regularly. So need to do the invalidate metadata to get the last row count. Hive doesn’t seem have this issue.

[vmhost3:21000] > invalidate metadata;
Query: invalidate metadata

Fetched 0 row(s) in 4.48s
[vmhost3:21000] > select count(*) from student_ext;
Query: select count(*) from student_ext
+----------+
| count(*) |
+----------+
| 11       |
+----------+
Fetched 1 row(s) in 0.94s

7. Solution to fix this issue.
The solution is to add one more argument, last-value, to specify where the load stop last time. I redoed the step 1 to 5.
sqoop import \
–connect jdbc:oracle:thin:@enkx3-scan:1521:dbm2 \
–username wzhou \
–password wzhou \
–table STUDENT \
–incremental append \
–check-column student_id \
–last-value 4 \
-m 4 \
–split-by major

Here are the result

HDFS
[wzhou@vmhost1 ~]$ hdfs dfs -ls /user/wzhou/STUDENT
Found 9 items
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:14 /user/wzhou/STUDENT/part-m-00000
-rw-r--r--   2 wzhou bigdata         42 2015-09-25 05:15 /user/wzhou/STUDENT/part-m-00001
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:16 /user/wzhou/STUDENT/part-m-00002
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 05:17 /user/wzhou/STUDENT/part-m-00003
-rw-r--r--   2 wzhou bigdata         32 2015-09-25 05:17 /user/wzhou/STUDENT/part-m-00004
-rw-r--r--   2 wzhou bigdata         40 2015-09-25 06:02 /user/wzhou/STUDENT/part-m-00005
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 06:02 /user/wzhou/STUDENT/part-m-00006
-rw-r--r--   2 wzhou bigdata          0 2015-09-25 06:03 /user/wzhou/STUDENT/part-m-00007
-rw-r--r--   2 wzhou bigdata         16 2015-09-25 06:04 /user/wzhou/STUDENT/part-m-00008
[wzhou@vmhost1 ~]$ hdfs dfs -cat /user/wzhou/STUDENT/part*5
5,student3,computer
7,student5,computer
[wzhou@vmhost1 ~]$ hdfs dfs -cat /user/wzhou/STUDENT/part*8
6,student4,math

Hive
hive> select * from student_ext;
OK
2	student2	computer
4	student4	accounting
1	student1	math
3	student3	math
5	student3	computer
7	student5	computer
6	student4	math
Time taken: 0.095 seconds, Fetched: 7 row(s)

Impala
[vmhost3:21000] > select count(*) from student_ext;
Query: select count(*) from student_ext
+----------+
| count(*) |
+----------+
| 7        |
+----------+
Fetched 1 row(s) in 0.82s

Ok, everything looks good right now.