Import Data to Hive from Oracle Database

In the last three posts, I discussed the following:
1. Install Cloudera Hadoop Cluster using Cloudera Manager
2. Configurations after CDH Installation
3. Load Data to Hive Table.
import_export
This post will discuss a basic scenario in Hive: Dump some data from Oracle database, load to HDFS, and query the data using Hive. Sqoop is a tool designed for transferring bulk data between Hadoop and RDBMS, such as Oracle. It can be used to populated data to Hive or HBase on Hadoop. The import can be the entire table or the result from a query or incremental load. Ok, here is the example to show how to use sqoop to load data from Oracle.

1. Add the user to hive group on all hosts in the cluster. Otherwise, you could see the following error:
chgrp: changing ownership of ‘hdfs://vmhost1.local:8020/user/hive/warehouse/test_oracle.db/my_all_objects_sqoop/part-m-00004_copy_1’: User does not belong to hive

[root@vmhost1 mnt]# usermod -a -G hive wzhou
[root@vmhost1 mnt]# id wzhou
uid=502(wzhou) gid=502(wzhou) groups=502(wzhou),481(hive),501(bigdata)

2. Create the target directory, /tmp/sqoopimport, for the sqoop job.
[wzhou@vmhost1 data]$ hdfs dfs -mkdir /tmp/sqoopimport

3. Install Oracle JDBC driver. Check if Oracle JDBC exist or not, if not, do the following to install the driver. The Oracle JDBC driver file can be downloaded from Oracle web site at
http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html
The file name is ojdbc6.jar. Then copy this file to /usr/lib/sqoop/lib/ if using package, or /var/lib/sqoop/ if using parcel. Make sure to change file permission to 755 afterwards.

[root@vmhost1 mnt]# cp ojdbc6.jar /usr/lib/sqoop/lib/
[root@vmhost1 mnt]# ls -l /usr/lib/sqoop/lib/oj*.jar
-rw-r—– 1 root root 2739670 Sep 21 15:24 /usr/lib/sqoop/lib/ojdbc6.jar
[root@vmhost1 mnt]# chmod 755 /usr/lib/sqoop/lib/ojdbc6.jar
[root@vmhost1 mnt]# ls -l /usr/lib/sqoop/lib/oj*.jar
-rwxr-xr-x 1 root root 2739670 Sep 21 15:24 /usr/lib/sqoop/lib/ojdbc6.jar

Verify whether the JDBC connection is working or not, run sqoop with list-databases argument. It will connect to oracle database and list schemas in the dbm database. If don’t want to show the password in the commandline, use -P parameter to input the db password.

sqoop list-databases \
–connect jdbc:oracle:thin:@enkx3-scan:1521:dbm1 \
–username wzhou \
-P

Here are the execution result
[wzhou@vmhost1 data]$ sqoop list-databases \
> –connect jdbc:oracle:thin:@enkx3-scan:1521:dbm1 \
> –username wzhou \
> -P

Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/21 18:55:58 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
Enter password:
15/09/21 18:56:02 INFO oracle.OraOopManagerFactory: Data Connector for Oracle and Hadoop is disabled.
15/09/21 18:56:02 INFO manager.SqlManager: Using default fetchSize of 1000
15/09/21 18:56:03 INFO manager.OracleManager: Time zone has been set to GMT
ORACLE
JBARBA
TANEL
WZHOU
MBH
MPAGANO
GSHEPPARD
ENKITEC
ACOLVIN
KARLARAO
KSO
MARTIN
KOSBORNE
ODI
RJOHNSON
ORACLE_OCM
. . . .
SYSMAN
DBSNMP
WMSYS
XDB
MGMT_VIEW
SYS
SYSTEM

5. Create the Hive table. The new Hive table is called my_all_objects_sqoop under test_oracle database.

First, just check no my_all_objects_sqoop table exist on HDFS
hdfs dfs -ls /user/hive/warehouse/test_oracle.db

Run the following commands to create hive table in test_oracle database.
hive
USE test_oracle;

CREATE TABLE my_all_objects_sqoop (
owner string,
object_name string,
object_id int,
object_type string,
create_date string
)
STORED AS TEXTFILE;
SHOW TABLES;

Check out the folders on HDFS.
hdfs dfs -ls /user/hive/warehouse/test_oracle.db
hdfs dfs -ls /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop

Execution result.
[wzhou@vmhost1 data]$ hdfs dfs -ls /user/hive/warehouse/test_oracle.db
Found 1 items
drwxrwxrwt – wzhou hive 0 2015-09-21 11:55 /user/hive/warehouse/test_oracle.db/my_all_objects

hive> USE test_oracle;
OK
Time taken: 0.221 seconds

hive> CREATE TABLE my_all_objects_sqoop (
> owner string,
> object_name string,
> object_id int,
> object_type string,
> create_date string
> )
> STORED AS TEXTFILE;

OK
Time taken: 0.247 seconds

hive> SHOW TABLES;
OK
my_all_objects
my_all_objects_sqoop
Time taken: 0.154 seconds, Fetched: 2 row(s)

[wzhou@vmhost1 data]$ hdfs dfs -ls /user/hive/warehouse/test_oracle.db
Found 2 items
drwxrwxrwt – wzhou hive 0 2015-09-21 11:55 /user/hive/warehouse/test_oracle.db/my_all_objects
drwxrwxrwt – wzhou hive 0 2015-09-21 17:15 /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop

[wzhou@vmhost1 data]$ hdfs dfs -ls /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop
[wzhou@vmhost1 data]$

6. Run the sqoop to import the table directly to hive table. Please note, hive has a weird requirement and I had to add $CONDITIONS in the where clause although I am going to retrieve all rows from all_objects. -m or –num-mappers argument allows the parallel import. It specifies the number of map tasks (parallel processes) to use to perform the import. When running in parallel, sqoop needs a way to split the workload. This is where –split-by argument is used for. –split-by specify the column to used as a split column. By the default, the primary key of the table is used as the split column. But you can specify a different column by using –split-by argument.

sqoop import \
–connect jdbc:oracle:thin:@enkx3-scan:1521:dbm1 \
–username wzhou \
–password wzhou \
–query “select owner, object_name, object_id, object_type, to_char(created, ‘yyyy-mm-dd’) created_date from all_objects where \$CONDITIONS” \
-m 4 \
–split-by object_type \
–hive-import \
–target-dir ‘/tmp/sqoopimport’ \
–hive-table test_oracle.my_all_objects_sqoop

select * from my_all_objects_sqoop limit 3;
select count(*) from my_all_objects_sqoop;

hdfs dfs -ls /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop
hdfs dfs -ls /user/hive/warehouse/test_oracle.db/my_all_objects
hdfs dfs -cat /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop/part-m-00000 | head -n 10

Execution result.

[wzhou@vmhost1 data]$ <b>sqoop import \
>     --connect jdbc:oracle:thin:@enkx3-scan:1521:dbm1 \
>     --username wzhou \
>     --password wzhou \
>     --query "select owner, object_name, object_id, object_type, to_char(created, 'yyyy-mm-dd') created_date from all_objects where \$CONDITIONS" \
>     -m 4 \
>     --split-by object_type \
>     --hive-import \
>     --target-dir '/tmp/sqoopimport' \
>     --hive-table test_oracle.my_all_objects_sqoop  <b> 
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/21 17:53:15 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
15/09/21 17:53:15 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
15/09/21 17:53:15 INFO tool.BaseSqoopTool: Using Hive-specific delimiters for output. You can override
15/09/21 17:53:15 INFO tool.BaseSqoopTool: delimiters with --fields-terminated-by, etc.
15/09/21 17:53:15 INFO oracle.OraOopManagerFactory: Data Connector for Oracle and Hadoop is disabled.
15/09/21 17:53:15 INFO manager.SqlManager: Using default fetchSize of 1000
15/09/21 17:53:15 INFO tool.CodeGenTool: Beginning code generation
15/09/21 17:53:18 INFO manager.OracleManager: Time zone has been set to GMT
15/09/21 17:53:18 INFO manager.SqlManager: Executing SQL statement: select owner, object_name, object_id, object_type, to_char(created, 'yyyy-mm-dd') created_date from all_objects where  (1 = 0) 
15/09/21 17:53:19 INFO manager.SqlManager: Executing SQL statement: select owner, object_name, object_id, object_type, to_char(created, 'yyyy-mm-dd') created_date from all_objects where  (1 = 0) 
15/09/21 17:53:19 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-wzhou/compile/d96bcc9fb5354b998f0db547e0ed17bc/QueryResult.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
15/09/21 17:53:21 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-wzhou/compile/d96bcc9fb5354b998f0db547e0ed17bc/QueryResult.jar
15/09/21 17:53:21 INFO mapreduce.ImportJobBase: Beginning query import.
15/09/21 17:53:21 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
15/09/21 17:53:22 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
15/09/21 17:53:22 INFO client.RMProxy: Connecting to ResourceManager at vmhost1.local/192.168.56.71:8032
15/09/21 17:53:26 INFO db.DBInputFormat: Using read commited transaction isolation
15/09/21 17:53:26 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(object_type), MAX(object_type) FROM (select owner, object_name, object_id, object_type, to_char(created, 'yyyy-mm-dd') created_date from all_objects where  (1 = 1) ) t1
15/09/21 17:53:26 WARN db.TextSplitter: Generating splits for a textual index column.
15/09/21 17:53:26 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
15/09/21 17:53:26 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
15/09/21 17:53:26 INFO mapreduce.JobSubmitter: number of splits:5
15/09/21 17:53:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1442870712675_0012
15/09/21 17:53:27 INFO impl.YarnClientImpl: Submitted application application_1442870712675_0012
15/09/21 17:53:27 INFO mapreduce.Job: The url to track the job: http://vmhost1.local:8088/proxy/application_1442870712675_0012/
15/09/21 17:53:27 INFO mapreduce.Job: Running job: job_1442870712675_0012
15/09/21 17:53:40 INFO mapreduce.Job: Job job_1442870712675_0012 running in uber mode : false
15/09/21 17:53:40 INFO mapreduce.Job:  map 0% reduce 0%
15/09/21 17:53:53 INFO mapreduce.Job:  map 20% reduce 0%
15/09/21 17:54:25 INFO mapreduce.Job:  map 40% reduce 0%
15/09/21 17:55:19 INFO mapreduce.Job:  map 60% reduce 0%
15/09/21 17:55:49 INFO mapreduce.Job:  map 80% reduce 0%
15/09/21 17:56:29 INFO mapreduce.Job:  map 100% reduce 0%
15/09/21 17:56:29 INFO mapreduce.Job: Job job_1442870712675_0012 completed successfully
15/09/21 17:56:29 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=680385
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=728
		HDFS: Number of bytes written=1089568
		HDFS: Number of read operations=20
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=10
	Job Counters 
		Launched map tasks=5
		Other local map tasks=5
		Total time spent by all maps in occupied slots (ms)=160538
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=160538
		Total vcore-seconds taken by all map tasks=160538
		Total megabyte-seconds taken by all map tasks=164390912
	Map-Reduce Framework
		Map input records=22519
		Map output records=22519
		Input split bytes=728
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=641
		CPU time spent (ms)=10540
		Physical memory (bytes) snapshot=614903808
		Virtual memory (bytes) snapshot=8024203264
		Total committed heap usage (bytes)=159252480
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=1089568
15/09/21 17:56:29 INFO mapreduce.ImportJobBase: Transferred 1.0391 MB in 187.358 seconds (5.6791 KB/sec)
15/09/21 17:56:29 INFO mapreduce.ImportJobBase: Retrieved 22519 records.
15/09/21 17:56:29 INFO manager.OracleManager: Time zone has been set to GMT
15/09/21 17:56:29 INFO manager.SqlManager: Executing SQL statement: select owner, object_name, object_id, object_type, to_char(created, 'yyyy-mm-dd') created_date from all_objects where  (1 = 0) 
15/09/21 17:56:30 INFO manager.SqlManager: Executing SQL statement: select owner, object_name, object_id, object_type, to_char(created, 'yyyy-mm-dd') created_date from all_objects where  (1 = 0) 
15/09/21 17:56:30 WARN hive.TableDefWriter: Column OBJECT_ID had to be cast to a less precise type in Hive
15/09/21 17:56:30 INFO hive.HiveImport: Loading uploaded data into Hive

Logging initialized using configuration in jar:file:/usr/lib/hive/lib/hive-common-1.1.0-cdh5.4.3.jar!/hive-log4j.properties
OK
Time taken: 1.98 seconds
Loading data to table test_oracle.my_all_objects_sqoop
Table test_oracle.my_all_objects_sqoop stats: [numFiles=5, totalSize=1089568]
OK
Time taken: 0.886 seconds

Verify the result from Hive. You might need to exit Hive and re-enter hive to get the result from the new populated table. Otherwise, you could see 0 rows there.

hive> select * from my_all_objects_sqoop limit 3;
OK
SYS	C_COBJ#	29	CLUSTER	2013-03-12
SYS	C_TS#	6	CLUSTER	2013-03-12
SYS	C_FILE#_BLOCK#	8	CLUSTER	2013-03-12
Time taken: 0.684 seconds, Fetched: 3 row(s)

hive> select count(*) from my_all_objects_sqoop;
Query ID = wzhou_20150921175858_c67bac9f-9cf0-437a-a90c-d3a891455f33
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1442870712675_0013, Tracking URL = http://vmhost1.local:8088/proxy/application_1442870712675_0013/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1442870712675_0013
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-09-21 17:58:57,446 Stage-1 map = 0%,  reduce = 0%
2015-09-21 17:59:09,067 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.68 sec
2015-09-21 17:59:19,506 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 4.41 sec
MapReduce Total cumulative CPU time: 4 seconds 410 msec
Ended Job = job_1442870712675_0013
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 4.41 sec   HDFS Read: 1096958 HDFS Write: 6 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 410 msec
OK
22519
Time taken: 37.941 seconds, Fetched: 1 row(s)

Check the hive file on HDFS.

[wzhou@vmhost1 ~]$ hdfs dfs -ls /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop
Found 5 items
-rwxrwxrwt   2 wzhou hive       9927 2015-09-21 17:53 /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop/part-m-00000
-rwxrwxrwt   2 wzhou hive     248584 2015-09-21 17:54 /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop/part-m-00001
-rwxrwxrwt   2 wzhou hive      93915 2015-09-21 17:55 /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop/part-m-00002
-rwxrwxrwt   2 wzhou hive     735608 2015-09-21 17:55 /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop/part-m-00003
-rwxrwxrwt   2 wzhou hive       1534 2015-09-21 17:56 /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop/part-m-00004
[wzhou@vmhost1 ~]$ hdfs dfs -ls /user/hive/warehouse/test_oracle.db/my_all_objects
Found 1 items
-rwxrwxrwt   2 wzhou bigdata    2050380 2015-09-21 11:28 /user/hive/warehouse/test_oracle.db/my_all_objects/all_objects_data.txt

[wzhou@vmhost1 ~]$ hdfs dfs -cat /user/hive/warehouse/test_oracle.db/my_all_objects_sqoop/part-m-00000 | head -n 10
SYSC_COBJ#29CLUSTER2013-03-12
SYSC_TS#6CLUSTER2013-03-12
SYSC_FILE#_BLOCK#8CLUSTER2013-03-12
SYSC_USER#10CLUSTER2013-03-12
SYSC_OBJ#2CLUSTER2013-03-12
SYSORA$BASE100EDITION2013-03-12
SYSSMON_SCN_TO_TIME_AUX268CLUSTER2013-03-12
SYSC_OBJ#_INTCOL#421CLUSTER2013-03-12
SYSC_TOID_VERSION#495CLUSTER2013-03-12
SYSC_MLOG#625CLUSTER2013-03-12

Ok, I am done with the import to Hive here. In the next post, I am going to discuss export from Hive directly to Oracle database.

Advertisements

Load Data to Hive Table

In the last few posts, I discussed:
1. Install Cloudera Hadoop Cluster using Cloudera Manager
2. Configurations after CDH Installation
This post will discuss a basic scenario in Hive: Dump some data from Oracle database, load to HDFS, and query the data using Hive.
hive
Now let’s do the first hive table. I am going to dump data from ALL_OBJECTS view from dbm database on our X3 Exadata in the lab. Then transfer the file to my hadoop cluster, load the file to HDFS. Create a Hive table, load the data into this Hive table.

1. Create the flat file from ALL_OBJECTS on an Oracle database.
Run the following the script to generate a text file from all_objects view.
$ vi download_objects_data.sql

set echo off
set feedback off
set colsep '|'
set pagesize 0   
set trimspool on 
set headsep off  
set head off
--set sqlprompt ''
set linesize 150
set numw 10

col object_type for a20

spool all_objects_data.txt
select owner, object_name, object_id, object_type, to_char(created, 'yyyy-mm-dd') created_date
from all_objects;
spool off

set echo on
set feedback on

2. Run the above script in dbm database. Then copy all_objects_data.txt file to my Hadoop cluster.

3. Upload the file to /user/wzhou/test2 directory on HDFS.
[wzhou@vmhost1 data]$ pwd
/home/wzhou/test/data
[wzhou@vmhost1 data]$ cd ~/test/data
[wzhou@vmhost1 data]$ ls -l all_objects_data.txt
-rw-r–r– 1 wzhou wzhou 2050380 Sep 21 11:24 all_objects_data.txt
[wzhou@vmhost1 data]$ hdfs dfs -mkdir /user/wzhou/test2
[wzhou@vmhost1 data]$ hdfs dfs -copyFromLocal all_objects_data.txt /user/wzhou/test2
[wzhou@vmhost1 data]$ hdfs dfs -ls /user/wzhou/test2
Found 1 items
-rw-r–r– 2 wzhou bigdata 2050380 2015-09-21 11:28 /user/wzhou/test2/all_objects_data.txt

4. Create a Hive table.
hive
CREATE DATABASE test_oracle;
SHOW DATABASES;

USE test_oracle;
SHOW TABLES;

CREATE TABLE my_all_objects (
owner string,
object_name string,
object_id int,
object_type string,
create_date string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’
LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE;

SHOW TABLES;
DESC my_all_objects;

Here are the output from the execution.

[wzhou@vmhost1 data]$ hive
Logging initialized using configuration in jar:file:/usr/lib/hive/lib/hive-common-1.1.0-cdh5.4.3.jar!/hive-log4j.properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.

hive> CREATE DATABASE test_oracle;
OK
Time taken: 0.65 seconds

hive> SHOW DATABASES;
OK
default
test1
test_oracle
Time taken: 0.217 seconds, Fetched: 3 row(s)

hive> USE test_oracle;
OK
Time taken: 0.03 seconds

hive> SHOW TABLES;
OK
Time taken: 0.046 seconds

hive> CREATE TABLE my_all_objects (
    > owner string,
    > object_name string,
    > object_id int,
    > object_type string,
    > create_date string 
    > ) 
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
    > LINES TERMINATED BY '\n'
    > STORED AS TEXTFILE;
OK
Time taken: 0.228 seconds

hive> SHOW TABLES;
OK
my_all_objects
Time taken: 0.018 seconds, Fetched: 1 row(s)

hive> DESC my_all_objects;
OK
owner               	string              	                    
object_name         	string              	                    
object_id           	int                 	                    
object_type         	string              	                    
create_date         	string              	                    
Time taken: 0.128 seconds, Fetched: 5 row(s)

If want to know the script to create the table, use SHOW CREATE TABLE command to generate the DDL of the table.

hive> SHOW CREATE TABLE my_all_objects;
OK
CREATE TABLE `my_all_objects`(
  `owner` string, 
  `object_name` string, 
  `object_id` int, 
  `object_type` string, 
  `create_date` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '|' 
  LINES TERMINATED BY '\n' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://vmhost1.local:8020/user/hive/warehouse/test_oracle.db/my_all_objects'
TBLPROPERTIES (
  'transient_lastDdlTime'='1442853571')
Time taken: 0.209 seconds, Fetched: 17 row(s)

5. Insert data to the table from all_objects_data.txt.
LOAD DATA INPATH ‘/user/wzhou/test2/all_objects_data.txt’ OVERWRITE INTO TABLE my_all_objects;

6. Verify the result.
select * from my_all_objects limit 10;
select count(*) from my_all_objects;

Here are the output:

hive> LOAD DATA INPATH '/user/wzhou/test2/all_objects_data.txt' OVERWRITE INTO TABLE my_all_objects;
Loading data to table test_oracle.my_all_objects
chgrp: changing ownership of 'hdfs://vmhost1.local:8020/user/hive/warehouse/test_oracle.db/my_all_objects/all_objects_data.txt': User does not belong to hive
Table test_oracle.my_all_objects stats: [numFiles=1, numRows=0, totalSize=2050380, rawDataSize=0]
OK
Time taken: 0.71 seconds

Let’s find some rows from the hive table.

hive> select * from my_all_objects limit 10;
OK
SYS            	ICOL$                         	NULL	TABLE               	2013-03-12
SYS            	I_USER1                       	NULL	INDEX               	2013-03-12
SYS            	CON$                          	NULL	TABLE               	2013-03-12
SYS            	UNDO$                         	NULL	TABLE               	2013-03-12
SYS            	C_COBJ#                       	NULL	CLUSTER             	2013-03-12
SYS            	I_OBJ#                        	NULL	INDEX               	2013-03-12
SYS            	PROXY_ROLE_DATA$              	NULL	TABLE               	2013-03-12
SYS            	I_IND1                        	NULL	INDEX               	2013-03-12
SYS            	I_CDEF2                       	NULL	INDEX               	2013-03-12
SYS            	I_OBJ5                        	NULL	INDEX               	2013-03-12
Time taken: 0.339 seconds, Fetched: 10 row(s)

Getting the total row count is acutally a MapReduce job.

hive> select count(*) from my_all_objects;
Query ID = wzhou_20150921115656_f9fe088d-149b-4436-9974-0265d8fb676a
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1442853303881_0001, Tracking URL = http://vmhost1.local:8088/proxy/application_1442853303881_0001/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1442853303881_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-09-21 11:57:00,160 Stage-1 map = 0%,  reduce = 0%
2015-09-21 11:57:12,692 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.63 sec
2015-09-21 11:57:23,135 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 4.21 sec
MapReduce Total cumulative CPU time: 4 seconds 210 msec
Ended Job = job_1442853303881_0001
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 4.21 sec   HDFS Read: 2057365 HDFS Write: 6 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 210 msec
OK
22782
Time taken: 40.221 seconds, Fetched: 1 row(s)

You might notice that I have two copies of data:
1. One is hive table under /user/hive/warehouse/test_oracle.db/my_all_objects
2. Another one of my original datafile /user/wzhou/test2/all_objects_data.txt.

 
[wzhou@vmhost1 data]$ hdfs dfs -ls /user/hive
Found 1 items
drwxrwxrwt   - hive hive          0 2015-09-21 11:38 /user/hive/warehouse
[wzhou@vmhost1 data]$ hdfs dfs -ls /user/hive/warehouse
Found 3 items
drwxrwxrwt   - wzhou hive          0 2015-09-20 19:05 /user/hive/warehouse/my_test1
drwxrwxrwt   - wzhou hive          0 2015-09-20 18:56 /user/hive/warehouse/test1.db
drwxrwxrwt   - wzhou hive          0 2015-09-21 11:39 /user/hive/warehouse/test_oracle.db
[wzhou@vmhost1 data]$ hdfs dfs -ls /user/hive/warehouse/test_oracle.db
Found 1 items
drwxrwxrwt   - wzhou hive          0 2015-09-21 11:55 /user/hive/warehouse/test_oracle.db/my_all_objects
[wzhou@vmhost1 data]$ hdfs dfs -ls /user/hive/warehouse/test_oracle.db/my_all_objects
Found 1 items
-rwxrwxrwt   2 wzhou bigdata    2050380 2015-09-21 11:28 /user/hive/warehouse/test_oracle.db/my_all_objects/all_objects_data.txt

For this test, it’s ok I have two copies of the same data. But for big dataset, it does not make sense to have two identical copies on HDFS. If I want to just keep one copy, this is where Hive External Table is used. Let me assume the /user/wzhou/test2 is the folder I keep all of the data for hive operations. The Hive table creation is a little different.

CREATE EXTERNAL TABLE my_all_objects_ext (
owner string,
object_name string,
object_id string,
object_type string,
create_date string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’
LINES TERMINATED BY ‘\n’
LOCATION ‘/user/wzhou/test2’;

Note: the LOCATION specify the directory name, not the filename of the datafiles. You could have thousand data files under the same directory.

After run the above script to create a new Hive external table, verify the table.

 
hive> select * from my_all_objects_ext limit 10;
OK
SYS            	ICOL$               20	TABLE               	2013-03-12
SYS            	I_USER1             46	INDEX               	2013-03-12
SYS            	CON$                28	TABLE               	2013-03-12
SYS            	UNDO$               15	TABLE               	2013-03-12
SYS            	C_COBJ#             29	CLUSTER             	2013-03-12
SYS            	I_OBJ#              3	INDEX               	2013-03-12
SYS            	PROXY_ROLE_DATA$    25	TABLE               	2013-03-12
SYS            	I_IND1              41	INDEX               	2013-03-12
SYS            	I_CDEF2             54	INDEX               	2013-03-12
SYS            	I_OBJ5              40	INDEX               	2013-03-12
Time taken: 0.094 seconds, Fetched: 10 row(s)

hive> select count(*) from my_all_objects_ext;
Query ID = wzhou_20150921125858_48330cd5-220e-46ad-960d-67ca1315bc6c
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1442853303881_0002, Tracking URL = http://vmhost1.local:8088/proxy/application_1442853303881_0002/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1442853303881_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-09-21 12:58:17,560 Stage-1 map = 0%,  reduce = 0%
2015-09-21 12:58:28,124 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.71 sec
2015-09-21 12:58:38,570 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.31 sec
MapReduce Total cumulative CPU time: 3 seconds 310 msec
Ended Job = job_1442853303881_0002
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 3.31 sec   HDFS Read: 2057301 HDFS Write: 6 SUCCESS
Total MapReduce CPU Time Spent: 3 seconds 310 msec
OK
22782
Time taken: 36.329 seconds, Fetched: 1 row(s)

Ok, we have a basic understanding of Hive table and it’s quite simple to create Hive table on HDFS. This approach is good when you don’t care where the source of the data, whether the data is from RDBMS, web log, mainframe, or other sources. As long as you can put the file on HDFS, you can use Hive to perform the query. Well, you might have a scenario that you want to pull the data directly from RDBMS, such as Oracle. In this case, one popular tool is using Sqoop. In the next post, I am going to discuss how to use Sqoop to import the data to HDFS from Oracle database and export the content of Hive table back to Oracle database.

Configurations after CDH Installation

In the last post, I discussed the steps to install a 3 node hadoop cluster by using Cloudera Manager. In the next few posts, I am going to discuss some technologies that are frequently used, such as Hive, Sqoop, Impala and Spark.

There are a few things that need to be configured after the CDH Installation.

1. Configure NTPD. Start up ntpd process on every host. Otherwise, Clouder Manager could display a healthcheck failure: The host’s NTP service did not respond to a request for the clock offset.
# service ntpd status
# service ntpd start
# chkconfig ntpd on
# chkconfig –list ntpd
# ntpdc -np

2. Configure Replication Factor. As my little cluster has only 2 Data nodes, I need to reduce the replication factor from the default value of 3 to 2 to avoid the annoying blocks under-replicated type of error. First run the following command to change the replication factor to 2.

hadoop fs -setrep -R 2 /

Then goto HDFS Configuration, change Replication Factor to 2.

3. Change message logging level from INFO to WARN. I can not believe how many INFO messages are logged and there are no way I can see a message for more than 3 seconds before it is quickly refreshed away by a flood of INFO messages. In my opinion, majority of the INFO messages are useless and should not be logged in the first place. It seems more like DEBUG messages to me. So before my little cluster goes crazy in logging tons of useless messages, I need to quickly change logging level from INFO to WARNING. Another painful thing is that there are many log files from various Hadoop components, and are located at many different locations. I feel like I am siting in a space shuttle cockpit and need to turn off many switches not in a central location.
space_shuttle_cockpit
I could find out the logfile configuration file, and fix the parameters one by one. But it would take some time and too painful. The easiest way I found out is to use Cloudera Manager to make the change. Bascially, type in logging level as the search term. It will pop up a long list of components with the logging level and change them one by one. You will not believe how many logging level parameters are in the system. After the change, it’s recommended to restart the cluster as certain parameters are stale.
CM_change_INO_WARN

4. Configure Hue’s superuser and password. From Cloudera Manager screen, click Hue to start the Hue screen. The weird part about the Hue is that there is no pre-set superuser for the administration. Whoever logon to the Hue first will become the superuser of Hue. I don’t understand why Hue just takes whatever user and password Cloudera Manager uses. Anyway, to make my life easier, I just use the same login user and password for Cloudera Manager, admin.
hue_initial_screen

5. Add new user.
By default hdfs user is the superuser for HDFS, not the root user. So before doing any work on Hadoop, it is a good idea to create a separte OS user instead of using hdfs user to execute Hadoop commands. Run the following commands on EVERY Host in the cluster.
a. Logon as root user.
b. Create bigdata group.
# groupadd bigdata
# grep bigdata /etc/group

c. Add the new user, wzhou.
# useradd -G bigdata -m wzhou

If the user exist before the bigdata created, do the following
# usermod -a -G bigdata wzhou

d. Change password
# passwd wzhou

e. Verify the user.
# id wzhou

f. Create the user home directory on HDFS.
# sudo -u hdfs hdfs dfs -mkdir /user/wzhou
# sudo -u hdfs hdfs dfs -ls /user

[root@vmhost1 ~]# sudo -u hdfs hdfs dfs -ls /user
Found 8 items
drwxrwxrwx   - mapred hadoop              0 2015-09-15 05:40 /user/history
drwxrwxr-t   - hive   hive                0 2015-09-15 05:44 /user/hive
drwxrwxr-x   - hue    hue                 0 2015-09-15 10:12 /user/hue
drwxrwxr-x   - impala impala              0 2015-09-15 05:46 /user/impala
drwxrwxr-x   - oozie  oozie               0 2015-09-15 05:47 /user/oozie
drwxr-x--x   - spark  spark               0 2015-09-15 05:41 /user/spark
drwxrwxr-x   - sqoop2 sqoop               0 2015-09-15 05:42 /user/sqoop2
drwxr-xr-x   - hdfs   supergroup          0 2015-09-20 11:23 /user/wzhou

g. Change the ownership of the directory.
# sudo -u hdfs hdfs dfs -chown wzhou:bigdata /user/wzhou
# hdfs dfs -ls /user

[root@vmhost1 ~]# sudo -u hdfs hdfs dfs -chown wzhou:bigdata /user/wzhou
[root@vmhost1 ~]# sudo -u hdfs hdfs dfs -ls /user</strong>
Found 8 items
drwxrwxrwx   - mapred hadoop           0 2015-09-15 05:40 /user/history
drwxrwxr-t   - hive   hive             0 2015-09-15 05:44 /user/hive
drwxrwxr-x   - hue    hue              0 2015-09-15 10:12 /user/hue
drwxrwxr-x   - impala impala           0 2015-09-15 05:46 /user/impala
drwxrwxr-x   - oozie  oozie            0 2015-09-15 05:47 /user/oozie
drwxr-x--x   - spark  spark            0 2015-09-15 05:41 /user/spark
drwxrwxr-x   - sqoop2 sqoop            0 2015-09-15 05:42 /user/sqoop2
drwxr-xr-x   - wzhou  bigdata          0 2015-09-20 11:23 /user/wzhou

h. Run a sample test.
Logon as wzhou user and verify whether the user can run sample MapReduce job from hadoop-mapreduce-examples.jar.

[wzhou@vmhost1 hadoop-mapreduce]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi 10 1000000
Number of Maps  = 10
Samples per Map = 1000000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
15/09/20 11:32:28 INFO client.RMProxy: Connecting to ResourceManager at vmhost1.local/192.168.56.71:8032
15/09/20 11:32:29 INFO input.FileInputFormat: Total input paths to process : 10
15/09/20 11:32:29 INFO mapreduce.JobSubmitter: number of splits:10
15/09/20 11:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1442764085933_0001
15/09/20 11:32:30 INFO impl.YarnClientImpl: Submitted application application_1442764085933_0001
15/09/20 11:32:30 INFO mapreduce.Job: The url to track the job: http://vmhost1.local:8088/proxy/application_1442764085933_0001/
15/09/20 11:32:30 INFO mapreduce.Job: Running job: job_1442764085933_0001
15/09/20 11:32:44 INFO mapreduce.Job: Job job_1442764085933_0001 running in uber mode : false
15/09/20 11:32:44 INFO mapreduce.Job:  map 0% reduce 0%
15/09/20 11:32:55 INFO mapreduce.Job:  map 10% reduce 0%
15/09/20 11:33:03 INFO mapreduce.Job:  map 20% reduce 0%
15/09/20 11:33:11 INFO mapreduce.Job:  map 30% reduce 0%
15/09/20 11:33:18 INFO mapreduce.Job:  map 40% reduce 0%
15/09/20 11:33:26 INFO mapreduce.Job:  map 50% reduce 0%
15/09/20 11:33:34 INFO mapreduce.Job:  map 60% reduce 0%
15/09/20 11:33:42 INFO mapreduce.Job:  map 70% reduce 0%
15/09/20 11:33:50 INFO mapreduce.Job:  map 80% reduce 0%
15/09/20 11:33:58 INFO mapreduce.Job:  map 90% reduce 0%
15/09/20 11:34:06 INFO mapreduce.Job:  map 100% reduce 0%
15/09/20 11:34:14 INFO mapreduce.Job:  map 100% reduce 100%
15/09/20 11:34:14 INFO mapreduce.Job: Job job_1442764085933_0001 completed successfully
15/09/20 11:34:15 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=124
		FILE: Number of bytes written=1258521
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=2680
		HDFS: Number of bytes written=215
		HDFS: Number of read operations=43
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=3
	Job Counters 
		Launched map tasks=10
		Launched reduce tasks=1
		Data-local map tasks=10
		Total time spent by all maps in occupied slots (ms)=65668
		Total time spent by all reduces in occupied slots (ms)=6387
		Total time spent by all map tasks (ms)=65668
		Total time spent by all reduce tasks (ms)=6387
		Total vcore-seconds taken by all map tasks=65668
		Total vcore-seconds taken by all reduce tasks=6387
		Total megabyte-seconds taken by all map tasks=67244032
		Total megabyte-seconds taken by all reduce tasks=6540288
	Map-Reduce Framework
		Map input records=10
		Map output records=20
		Map output bytes=180
		Map output materialized bytes=360
		Input split bytes=1500
		Combine input records=0
		Combine output records=0
		Reduce input groups=2
		Reduce shuffle bytes=360
		Reduce input records=20
		Reduce output records=0
		Spilled Records=40
		Shuffled Maps =10
		Failed Shuffles=0
		Merged Map outputs=10
		GC time elapsed (ms)=1026
		CPU time spent (ms)=8090
		Physical memory (bytes) snapshot=3877482496
		Virtual memory (bytes) snapshot=17644212224
		Total committed heap usage (bytes)=3034685440
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1180
	File Output Format Counters 
		Bytes Written=97
Job Finished in 106.368 seconds
Estimated value of Pi is 3.14158440000000000000

To restart all services in the cluster, you can just click Restart Action on the cluster from Cloudera Manager screen. However, if you want to start/stop a particular service, you might want to know the dependency of the services. Here are the order of starting/stopping sequence for all services on CDH 5.

Startup Sequence
1. Cloudera Management service
2. ZooKeeper
3. HDFS
4. Solr
5. Flume
6. Hbase
7. Key-Value Store Indexer
8. MapReduce or YARN
9. Hive
10. Impala
11. Oozie
12. Sqoop
13. Hue

Stop Sequence
1. Hue
2. Sqoop
3. Oozie
4. Impala
5. Hive
6. MapReduce or YARN
7. Key-Value Store Indexer
8. Hbase
9. Flume
10. Solr
11. HDFS
12. ZooKeeper
13. Cloudera Management Service

Ok, we are good here. In the next post, I am going to discuss load data to Hive.

Install Cloudera Hadoop Cluster using Cloudera Manager

Three years ago I tried to build up a Hadoop Cluster using Cloudera Manager. The GUI looked nice, but the installation was pain and full of issues. I gave up after many failed tries, and then went with the manual installation. It worked fine and I have built several clusters since then. After several years working on Oracle Exadata, I go back and retry the hadoop installation using Cloudera Manager. This time I installed CDH 5 cluster. The installation experience was much better than three years ago. But not surprised, the installation still has some issues and I can easily identify some bugs during the installation. But at least I can successfully install a 3 node hadoop cluster after several tries. The followings are my steps during the installation.

First, let me give a little detail about my VM environment. I am using Virtualbox and build three VMs.
vmhost1: This is where name node, clouder manager and many other roles are located.
vmhost2: Data Node
vmhost3: Data Node

Note: the default replication factor is 3 for hadoop. In my environment, it is under replicated. So I have to adjust replication factor from 3 to 2 after installation, just to get rid of some annoying alerts.

  • OS: Oracle Linux 6.7, 64-bit
  • CPU: 1 CPU initially for all 3 VMs. Then I realize vmhost1 needs a lot of processing power as majority of the installation and configuration happen on node 1. I gave vmhost1 2 CPUs. It proved still not enough and vmhost1 tended to freeze after installation. After I bump it up to 4 CPUs, vmhost1 looks fine. 1 CPU for Data Node host is enough.
  • Memory: Initially I gave 3G to all of 3 VMs. Then bump up node 1 to 5G before installation. It proved still not enough. After bumping up to 7G on vmhost1, the VM is not freezing anymore. I can see the memory usage is around 6.2G. So 7G configuration is good one. After installation, I reduced Data Node’s memory to 2G to free some memory. If not much job running, the memory usage is less than 1G on Data Node. If just testing out hadoop configuration, I can further reduce the memory to 1.5G per Data Node.
  • Network: Although I have 3 network adpaters built in the VM, I actually use only two of them. One is configured as Internal Network and this is where my cluster VMs are using to communicate with each other. Another one is configured as NAT, just to get internet connection to download packages from Cloudera site.
  • Storage: 30G. The actual size after installation is about 10~12G and really depended on how many times you fail and retry for the installation. The clean installation uses about 10G of space.

Pre-Steps Before the Installation

Before doing the installation, make sure configure the following in the VM:
1. Set SELinux policy to diasabled. Modify the following parameter in /etc/selinux/config file.
SELINUX=disabled

2. Disable firewall.
chkconfig iptables off

3. Set swappiness to 0 in /etc/sysctl.conf file. In the latest Cloudera CDH releases, it actually recommends changing to non-zero value, like 10. But for my little test, I set it to 0 like many people did.
vm.swappiness=0

4. Disable IPV6 in /etc/sysctl.conf file.
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.all.disable_ipv6 = 1

5. Configure passwordless SSH for root user. This is common step for Oracle RAC installation and I do not repeat the steps here.

Ok, ready for the installation. Here are the steps.
1. Download and Run the Cloudera Manager Server Installer
Logon as root user on vmhost1. All of the installations are under root user.
Run the following commands.

   
wget http://archive.cloudera.com/cm5/installer/latest/cloudera-manager-installer.bin
chmod u+x cloudera-manager-installer.bin
./cloudera-manager-installer.bin

It popups the following screen, just click Next or Yes for the rest of screens.
cdh_install_installer_1

If successful, you will see the following screen.
cdh_install_installer_finish

After click Close, it will pop up a browser window and point to http://localhost:7180/. At this moment, you can click Finish button on the previous installation GUI and close the installation GUI. Then move to browser and patiently wait for your Cloudera Manager starts up. Note. It usually takes several minutes. So be patient.

2. Logon Screen
After the following screen shows up, logon as admin user and use the same admin as password.
cdh_install_logon

3. Choose Version
The next screen is to choose which version to use. The default option is Cloudera Enterprise Data Hub Edition Trial, but with 60 days limit. Although Cloudera Express has no time limit, the Express version misses a lot of features I would like to test out. So I go with the Enterprise 60 days trial version.
cdh_install_version

4. Thank You Screen
Click Continue for the next Thank You screen.
cdh_install_thanks

5. Host Screen
Input vmhost[1-3].local, then click New Search. Note, make sure to use FQDN. I used to have bad experience not using FQDN in the old version of CDH installation. I am not going to waste my time in trying out what happens if not using FQDN.

After the following screen shows up, Click New Search, then the 3 hosts shows up. Then click Continue.
cdh_install_search

6. Select Repository
For Select Repository screen, the default option is using Parcels. Unfortunately I had issue using Parcel during the installation. It passed the step of installation on all of 3 hosts, but was stuck in download the latest Parcel file. After looking around, it seems the issue was that the default release was for September version, but the latest Parcel is pointing to the old August release. It seems version mismatch to me. Anyway, I am going to try out the Parcels option in the future again. But for this installation I changed to use Packages version. I intentionally did not choose the latest CDH 5.4.5 version. I would like to go with the version has long lag in time. For example there is about one month lag between CDH 5.4.3 and CDH 5.4.4. If 5.4.3 is not stable, Cloudera would put a new release a few days later and can not wait for one month to release new version. So I went with CDH 5.4.3.
Make sure to choose 5.4.3 for Navigator Key Trustee as well.
cdh_install_repos

7. Java Installation
For Java installation, leave it uncheck in default and click Continue.
cdh_install_jdk

8. Single User
For Enable Single User Mode, I did NOT check Single User Mode as I want cluster installation.
cdh_install_singleUser

9. SSH Login Credentials
For SSH Login Credentials, input root password. For Number of Simultaneous Installations, the default value is 10. It created a lot of headache during my installation. Each host downloads its own copy from cloudera website. As three of VMs were fighting each other for the internet bandwidth on my host machine, certain VM could wait there for several minutes for downloading the next package. If wait for more than 30 seconds, Cloudera Manager would time out the installation for this host and marked as failed installation. I am fine with the time out, but not happy with the next action. The the next step after clicking Retry Failed Hosts, it rolls back the installed packages on this VM and restart from scratch for the next try. It could take hours before I could reach to that point. The more elegant way to do the installation should be download once on host and distribute to other hosts for installation. If failed, retry from the failing point. Although the total download files is about a few GB per host, the failed retries can easily make it 10GB per host. So I have to set Number of Simultaneous Installation to 1 to limit to one VM for installation to reduce my failure rate.
cdh_install_ssh

10. Installation
The majority of installation time spends here if going with Package option. For Parcel option, this step is very fast because the majority of downloads are in the different screen. The time in this step really depends on the following factors:
1. How fast your internet bandwidth. The faster, the better.
2. The download speed from Cloudera site. Although my internet download speed can easily reach to 12M per second, my actual download time from Cloudera could vary depend on the time of day. Majority of the time is around 1~2M per second. Not great, but manageable. But sometimes it could drop down to 100K per second. This is the time I have higher chance to see the time out failure and fail the installation. At one point I could not tolerate this, I wake up at 2am and began my installation process. It was much faster. I can get 10M per second download speed with about 4~7 M on average. I only saw a few timeout failure on one host.
3. How many times the installation time out and have to retry.

If successful, the following screen shows.
cdh_install_success

11. Detect Version
After the success of installation, it shows the version screen.
cdh_install_detectVersion

12. Finish Screen
Finally, I can see this Finish screen. Life is good? Wrong! See my comment in the Cluster Setup step.
cdh_install_finish

13. Cluster Setup
When I reached to this step, I knew I was almost done. Just a few more steps, less than 30 minutes work. After a long day, I went for dinner and resume my configuration later. It proved to be the most expensive mistake I have done during this installation. After the dinner, I went back the same screen, click Continue. It show Session Time Out error. Not a big deal as I thought the background process knew where I was for the installation. Open the browser and type in the url, http://localhost:7180. Guess what, not the Cluster Setup screen, but the screen at step 4. Tried many ways and could not find a workaround. Out of ideas, I had to reinstall from step 4. What’s a pain! Another 7~8 hours work. My next installation did not waste any time on this step and completed it as quickly as possible.

Ok, go back to this screen. I want to use both Impala and Spark and could not find the combination for these two except all services. So I chose Custom Services and chose the services mainly from Core with Impala + Spark. Make sure to check Include Cloudera Navigator.
cdh_setup_service

14. Role Assignment
I chose the default, click Continue.
cdh_setup_role

15. Database Setup
Choose the default. Make sure to click Test Connection before clicking Continue.
cdh_setup_database_1
cdh_setup_database_2

16. Review
Click Continue.
cdh_setup_review

17. Completion
It shows the progress during the setup.
cdh_setup_progress

Finally it show the real completion screen.
cdh_setup_complete

After clicking Finish, you should screen similar as follows.
cdh_cm_screen
The life is good right now. The powerful Cloudera Manager has much more nice features than three years ago. Really worth my effort to go through the installation.
life_is_good

Hadoop, HDFS, MapReduce and Spark on Big Data

For the past few years, more and more companies are interested in starting big data projects. There are many technologies related to big data in the market right now, like Hadoop, Hadoop Distributed File System (HDFS), Map Reduce, Spark, Hive, Pig and many more. Although Hadoop is the most popular in this space, Spark gains a lot of attentions since 2014. For people who are interested in implementing big data in the near future, one of most popular questions is : which one should we use, Hadoop or Spark? It is a great question, but this may not be the apple to apple comparison.
question_look
Hadoop refers to a full ecosystem that includes the core projects like, HDFS, MapReduce, Hive, Flume, HBase. It has been around for about 10 years and has proven to be
the solution of choice for processing large datasets. Hadoop has three major layers:

  • HDFS for storage
  • YARN for resource management
  • MapReduce for computation engine.

In other words, HDFS is Hadoop’s storage layer while MapReduce is Hadoop’s processing framework that provides task automatic parallelization and distribution. The following diagram shows the components for Hadoop.
hadoop
Spark is targeting the layer of computation engine and can solve similar problems as MapReduce does but with in-memory approach. So MapReduce and Spark are the apple to apple comparison. The followings are the major differences between the two:

Response Speed
One major issue with MapReduce is that MapReduce need to persist intermediate data back to disk after a map or reduce action. The MapReduce job can write and read intermediate data on desk too many times during the job execution depending on the total number of Mappers, Reducers, and Combiners in the job configuration. Spark improve the IO read/write performance issue by processing intermediate data in-memory for fast computation. But Spark does needs significant more meomry than MapReduce for caching.

Simple Use
Another significant improvement in Spark over MapReduce is that Spark can not only bring data into memory for computation, but also easy of use for development. Spark supports Python and Scala in addition to Java, has much more APIs than MapReduce.
Here is the sample code of WordCount in Hadoop Map Reduce.

public static class WordCountMapClass extends MapReduceBase
  implements Mapper<LongWritable, Text, Text, IntWritable> {

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value,
       OutputCollector<Text, IntWritable> output,
       Reporter reporter) throws IOException {
    String line = value.toString();
    StringTokenizer itr = new StringTokenizer(line);
    while (itr.hasMoreTokens()) {
       word.set(itr.nextToken());
       output.collect(word, one);
    }
  }
}

public static class WorkdCountReduce extends MapReduceBase
  implements Reducer<Text, IntWritable, Text, IntWritable> {

  public void reduce(Text key, Iterator<IntWritable> values,
       OutputCollector<Text, IntWritable> output,
       Reporter reporter) throws IOException {
    int sum = 0;
    while (values.hasNext()) {
       sum += values.next().get();
    }
    output.collect(key, new IntWritable(sum));
  }
}

The following is the sample code to perform the same work in Spark. As I like writing code in Python, here is the python implementation.

text_file = spark.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

Hardware Cost
Spark does require to invest more in memory, which has the impact on your hardware investment while MapReduce can use regular commondity hardware.

Storage Model
MapReduce requires the HDFS storage for job execution. Spark doesn’t provide a storage system and is executed on top of a storage system. Spark is decoupling between Compuation and Storage model. It can go for an alternative computation model, abstracting data files with RDDs while use different storage models, like Hadoop HDFS, Amazon S3, Cassandara or Flat Files. By the way, S3 is Amazon Web Services’ solution for handling large files in the cloud. If running Spark on Amazon, you want to your Amazon EC2 compute nodes on the same zone as your S3 files to improve speed and save cost.

Related Technologies
MapReduce is suitable to batch-oriented applications. If want a real-time option, you need to use other platform, like Impala or Storm. For graph processing, need to use Giraph. For machine learning, people used to use Apache Mahout, but right now people are more favor to use Spark for machine learning.

Spark has one single core library, supporting not only data computation, but also data streaming, interactive queries, graph processing, and machine learning. The following is the diagram of Spark components:
spark_engine
Maturity
Hadoop MapReduce is a mature platform and is the primary framework on Hadoop to perform distributed processing, and has a large number of deployment in productions for many enterprises. It does has the limitations of high latency and batch oriented.
With a rich set of APIs and in-memory processing, Spark can support the workload in both batch and real time and gradually gains more ground on distributed processing.

Distributors
There are several Apache Hadoop distributors in the big data space, like Cloudera, Hortonworks, and MapR. Cloudera CDH distribution is one of the common used ones. It not only includes Hadoop, Hive, Spark, and YARN, but also has its own Impala for ad-hoc query and Cloudera Manager for Hadoop configuration and management.

So what is the usually start point for company just getting into big data space? There are two major types for big data projects:
1) Pure Big Data Project. The data comes from various source, RDBMS, logging data, streaming data, and many event triggered data. The data can be structured or unstructured.
After the data is populated in Hadoop HDFS storage, all the further processings happen on Hadoop econsystem, and no interaction between Hadoop and RDBMS during the processing stage.
This type of project usually requires the changes in work flow, significant new code in the process. This type of project is usually used in the scenarios that the project is working on something brand new and no work has done in the space. In other words, writing a lot of new code is not an issue as anyway there is no code before.

2) Hybrid Big Data Project. Some data, like historical data or certain large tables or table partitions, are offloaded from RDBMS to HDFS. The further processing involves
the work between RDBMS and Hadoop ecosystem together. This type of project usually requires to keep majority of the current workflow, and modify certain existing code, especially keep the changes in SQL to a miniminal level. The Hybrid Big Data project requires significant more knowledge in both Big Data and RDBMS as the tuning of the system happens not only on Hadoop Ecosystem, but also on RDBMS side.

In this blog, I will focus on the first type of project, Pure Big Data project and will discuss Hybrid Big Data Project on a separate post in the future.

Of course, there are many approaches working on the Pure Big Data Projects. I just list one approach I usually recommend here.
1) Build the Hadoop cluster using one of the major distributors, Cloudera CDH, or Hortonworks, or MapR.
2) Define the process or workflow that can regularly put files on HDFS. It doesn’t matter where the source of data comes from. If the data comes from RDBMS, you can use sqoop, or just flat CSV file outputed from RDBMS. If the data is streaming data, you can use flume or other ways to inject the data into HDFS.
3) Understand the data structure on HDFS, define your Hive table DDLs, and then point the locations of Hive tables to the file locations on HDFS. Yes, Hive might be slow for certain processing. But it is usually a good start to verify your big data ecosystem is performing normally.
If you’re using Cloudera CDH, you can use Impala to perform ad-hoc queries and it has a much better performance than Hive. For use resource allocation and scheduling, you can use YARN.
4) If the business requirements for the big data projects are Machine Learning, iterative data mining, and real time stream processing, the Spark is usually the choice. If dataset during the processing stage of batch job is very large and can not fit into memory, use the traditional Hadoop Map Reduce to process the data.
5) Result analysis.

Conclusion
Finally as a general rule, I would recommend to use Hadoop HDFS for the data storage framework and HDFS is a great solution for permanent storage. Uses Spark for fast in-memory processing and built-in Machine Learning capability. Use MapReduce if data can not fit into memory during batch processing. Use Hive and Impala for quick SQL access, and use Pig to save time in writing MapReduce jobs. From learning perspective for beginners, it seems more make sense to me to start with Hadoop HDFS, MapReduce, Impala, Hive and then Spark.

Of course, it also depends on what kind of problems need to solve and available budget and completion timeline, if the problem can be solved offline, response time is not critical, and budget is very limited, Hive and Impala can do the work. If you want fast response time, you need to consider Spark. But the hardware could be expensive for Spark’s memory requirements. In the end, you want to identify what problem the big data is trying to solve and keep flexible to use the right tool for solution.