Create Hadoop Cluster on Google Cloud Platform

There are many ways to create Hadoop clusters and I am going to show a few ways on Google Cloud Platform (GCP). The first approach is the standard way to build a Hadoop cluster, no matter whether you do it on cloud or on-premise. Basically create a group of VM instances and manually install Hadoop cluster on these VM instances. Many people have blogs or articles about this approach and I am not going to repeat the steps here.

In this blog, I am going to discuss the approach using Google Cloud Dataproc and you can actually have a Hadoop cluster up and running
within 2 minutes. Google Cloud Dataproc is a fully-managed cloud service for running Apache Hadoop cluster in a simple and fast way. The followings show the steps to create a Hadoop Cluster and submit a spark job to the cluster.

Create a Hadoop Cluster
Click Dataproc -> Clusters

Then click Enable API

Cloud Dataproc screen shows up. Click Create cluster

Input the following parameters:
Name : cluster-test1
Region : Choose use-central1
Zone : Choose us-central1-c

1. Master Node
Machine Type : The default is n1-standard-4, but I choose n1-standard-1 just for simple testing purpose.
Cluster Mode : There are 3 modes here. Single Mode (1 master, 0 worker), Standard Mode (1 master, N worker), and High Mode (3 masters, N workers). Choose Standard Mode.
Primary disk size : For my testing, 10GB 1s enough.

2. Worker Nodes
Similar configuration like Worker node. I use 3 worker nodes and disk size is 15 GB. You might notice that there is option to use local SSD storage. You can attach up to 8 local SSD devices to the VM instance. Each disk is 375 GB in size and you can not specify 10GB disk size here. The local SSDs are physically attached to the host server and offer higher performance and lower latency storage than Google’s persistent disk storage. The local SSDs is used for temporary data like shuffling data in MapReduce. The data on the local SSD storage is not persistent. For more information, please visit https://cloud.google.com/compute/docs/disks/local-ssd.

Another thing to mention is that Dataproc uses Cloud Storage bucket instead of HDFS for the Hadoop cluster. Although the hadoop command is still working and you won’t feel anything different, the underline storage is different. In my opinion, it is actually better because Google Cloud Storage bucket definitely has much better reliability and scalability than HDFS.

Click Create when everything is done. After a few minutes, the cluster is created.

Click cluster-test1 and it should show the cluster information.

If click VM Instances tab, we can see there is one master and 3 worker instances.

Click Configuration tab. It shows all configuration information.

Submit a Spark Job
Click Cloud Dataproc -> Jobs.

Once Submit Job screen shows up, input the following information, then click Submit.

After the job completes, you should see the followings:

To verify the result, I need to ssh to the master node to find out which ports are listening for connections. Click the drop down on the right of SSH of master node, then click Open in browser window.

Then run the netstat command.

cluster-test1-m:~$ netstat -a |grep LISTEN |grep tcp
tcp        0      0 *:10033                 *:*                     LISTEN     
tcp        0      0 *:10002                 *:*                     LISTEN     
tcp        0      0 cluster-test1-m.c.:8020 *:*                     LISTEN     
tcp        0      0 *:33044                 *:*                     LISTEN     
tcp        0      0 *:ssh                   *:*                     LISTEN     
tcp        0      0 *:52888                 *:*                     LISTEN     
tcp        0      0 *:58266                 *:*                     LISTEN     
tcp        0      0 *:35738                 *:*                     LISTEN     
tcp        0      0 *:9083                  *:*                     LISTEN     
tcp        0      0 *:34238                 *:*                     LISTEN     
tcp        0      0 *:nfs                   *:*                     LISTEN     
tcp        0      0 cluster-test1-m.c:10020 *:*                     LISTEN     
tcp        0      0 localhost:mysql         *:*                     LISTEN     
tcp        0      0 *:9868                  *:*                     LISTEN     
tcp        0      0 *:9870                  *:*                     LISTEN     
tcp        0      0 *:sunrpc                *:*                     LISTEN     
tcp        0      0 *:webmin                *:*                     LISTEN     
tcp        0      0 cluster-test1-m.c:19888 *:*                     LISTEN     
tcp6       0      0 [::]:10001              [::]:*                  LISTEN     
tcp6       0      0 [::]:44884              [::]:*                  LISTEN     
tcp6       0      0 [::]:50965              [::]:*                  LISTEN     
tcp6       0      0 [::]:ssh                [::]:*                  LISTEN     
tcp6       0      0 cluster-test1-m:omniorb [::]:*                  LISTEN     
tcp6       0      0 [::]:46745              [::]:*                  LISTEN     
tcp6       0      0 cluster-test1-m.c.:8030 [::]:*                  LISTEN     
tcp6       0      0 cluster-test1-m.c.:8031 [::]:*                  LISTEN     
tcp6       0      0 [::]:18080              [::]:*                  LISTEN     
tcp6       0      0 cluster-test1-m.c.:8032 [::]:*                  LISTEN     
tcp6       0      0 cluster-test1-m.c.:8033 [::]:*                  LISTEN     
tcp6       0      0 [::]:nfs                [::]:*                  LISTEN     
tcp6       0      0 [::]:33615              [::]:*                  LISTEN     
tcp6       0      0 [::]:56911              [::]:*                  LISTEN     
tcp6       0      0 [::]:sunrpc             [::]:*                  LISTEN  

Check out directories.

cluster-test1-m:~$ hdfs dfs -ls /
17/09/12 12:12:24 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.1-hadoop2
Found 2 items
drwxrwxrwt   - mapred hadoop          0 2017-09-12 11:56 /tmp
drwxrwxrwt   - hdfs   hadoop          0 2017-09-12 11:55 /user

There are a few UI screens available to check out the Hadoop cluster and job status.
HDFS NameNode (port 9870)

YARN Resource Manager (port 8088)

Spark Job History (port 18080)

Dataproc approach is an easy deployment tool to create a Hadoop cluster. Although it is powerful, I miss the nice UI like Cloudera Manager. To install Cloudera CDH cluster, I need to use a different approach and I am going to discuss it in the future blog.

Missing Classpath Issue during Sqoop Import

Recently I run into an issue when using Sqoop import with the following error messages:

17/02/22 10:44:17 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1487780389745_0001/
17/02/22 10:44:17 INFO mapreduce.Job: Running job: job_1487780389745_0001
17/02/22 10:44:27 INFO mapreduce.Job: Job job_1487780389745_0001 running in uber mode : false
17/02/22 10:44:27 INFO mapreduce.Job:  map 0% reduce 0%
17/02/22 10:44:38 INFO mapreduce.Job: Task Id : attempt_1487780389745_0001_m_000000_0, Status : FAILED
Error: java.lang.ClassNotFoundException: org.apache.commons.lang3.StringUtils
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at org.apache.sqoop.manager.oracle.OracleUtils.escapeIdentifier(OracleUtils.java:36)
	at org.apache.sqoop.manager.oracle.OraOopOracleQueries.getTableColumns(OraOopOracleQueries.java:683)
	at org.apache.sqoop.manager.oracle.OraOopOracleQueries.getTableColumns(OraOopOracleQueries.java:767)
	at org.apache.sqoop.manager.oracle.OraOopDBRecordReader.getSelectQuery(OraOopDBRecordReader.java:195)
	at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:235)
	at org.apache.sqoop.manager.oracle.OraOopDBRecordReader.nextKeyValue(OraOopDBRecordReader.java:356)
	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
	at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
	at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

I used sqoop import many times and have never run into this issue. This made me wondering what’s going.
I checked out the CDH version. The version is 5.8.0 and I used to use CDH 5.7 a lot.

[root@quickstart lib]# hadoop version
Hadoop 2.6.0-cdh5.8.0
Subversion http://github.com/cloudera/hadoop -r 57e7b8556919574d517e874abfb7ebe31a366c2b
Compiled by jenkins on 2016-06-16T19:38Z
Compiled with protoc 2.5.0
From source with checksum 9e99ecd28376acfd5f78c325dd939fed
This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.8.0.jar

Check out the sqoop version and it is 1.4.6

[root@quickstart lib]# sqoop version
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
17/02/22 13:37:28 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.8.0
Sqoop 1.4.6-cdh5.8.0
git commit id 
Compiled by jenkins on Thu Jun 16 12:25:21 PDT 2016

The Cloudera Quickstart VM is using CDH 5.8.0 and this is the first time I am touching CDH 5.8. It might relate to CDH 5.8. Did some research and found someone had the exact issue as I did. It is a sqoop bug: SQOOP-2999Sqoop ClassNotFoundException (org.apache.commons.lang3.StringUtils) is thrown when executing Oracle direct import map task.

It looks like the new version of Sqoop in the CDH 5.8.0 used a new class OracleUtils that has a dependency on org.apache.commons.lang3.StringUtils. The jar that contains the class is not on the classpath that Sqoop passes to the mappers. Therefore the exception thrown at runtime. Unfortunately the fixed is in CDH 5.8.2 as indicated in the Cloudera’s release note and the first sqoop bug in the note is this one: SQOOP-2999.

At this moment, I don’t want to go through the upgrade or patching of CDH. Tried to see whether there is any workaround on the internet. Could not find anything related to this one. So I will use my way to fix this issue.

First I need to find out which jar file containing this class. There is one post discussing about this class and the related jar file: commons-lang3-3.1.jar. Luckily I do have a commons-lang3-3.1.jar file in the system.

[root@quickstart lib]# locate commons-lang3-3.1.jar
/usr/lib/mahout/lib/commons-lang3-3.1.jar

Next I need to figure out how to add this jar to the class path for Sqoop. Majority of Hadoop environment variables setting can be found at /etc/default directory and I did found a file called sqoop2-server.

[root@quickstart default]# cat sqoop2-server
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

CATALINA_BASE=/var/lib/sqoop2/tomcat-deployment
SQOOP_USER=sqoop2
SQOOP_CONFIG=/etc/sqoop2/conf
SQOOP_LOG=/var/log/sqoop2
SQOOP_TEMP=/var/run/sqoop2
SQOOP_PID=/var/run/sqoop2/sqoop-server-sqoop2.pid
CATALINA_BIN=/usr/lib/bigtop-tomcat/bin
CATALINA_TMPDIR=/var/tmp/sqoop2
CATALINA_OPTS=-Xmx1024m
CATALINA_OUT=/var/log/sqoop2/sqoop-tomcat.log
#AUX_CLASSPATH=

Uncomment AUX_CLASSPATH= line and change to the following
AUX_CLASSPATH=/usr/lib/mahout/lib/commons-lang3-3.1.jar
After restarted Sqoop service, no luck. Still not working.

I could change /etc/sqoop2/conf/setenv.sh file and add this jar file to CLASSPATH. But there is no CLASSPATH used in this script. I also don’t want to change something and completely forgot later on. So I tried the option to add CLASSPATH on Cloudera Manager. But the tricky question is to which parameter I can add.

As Sqoop is managed by YARN, I tried to the add the class in the YARN Application Classpath or yarn.application.classpath.
The result is shown below.
sqoop_classpath_missing_issue
After the change, redeploy the client configuration and bounce the CDH cluster. Rerun the Sqoop job and it completed without error. Please note: this is just temporary workaround solution. The right approach should still be upgrade to the new version of CDH.

Mysterious Time Gap in Spark Job Timeline

Sometime ago one of my clients asked me a question when reviewing a Spark job: why there is a time gap in the event timeline, sometimes can be as long as one minute. If there are a few seconds, it seems make sense it could relate to Spark’s overhead between each job run. But for one minute, it seem to be too long for any overhead activities because the whole job takes only 8~9 minutes. I didn’t have a good answer for the question. Recently I did some benchmark for a spark job on a X3 full rack Oracle BDA in our lab, I did notice the same behavior. I tracked down the issue and finally figured out the cause of the timeline gap. I am going to share my findings in this blog.

My benchmark is on an X3 full rack BDA with Spark version 1.6 and CDH 5.7. The spark testing script is a pretty simple one and important lines related to this timeline gap are listed as follows:

line 42: val myDF = hiveContext.sql(“select * from wzdb.sales_part “)
line 44: myDF.show()
line 47: val rawDF = hiveContext.sql(“select * from wzdb.sales_raw limit 100 “)
line 48: rawDF.show()

Line 42 is pulling all data from wzdb.sales_part table, which is a hive partition table using Parquet and SNAPPY compression. The table has about 1.3 billion rows and 1,680 partitions. Line 44 just show the DataFrame myDF, by default it shows 20 rows. Similarly line 47 pull 100 rows from wzdb.sales_raw table and line 48 show the first 20 rows from the table. Ok, the code can not be simpler than that.

After started the spark job, it finished in 40 seconds. However, when I checked out the Event Timeline, it shows there is a time gap between Job (or stage) Id 1 and Job Id 2. Job Id 1 started at 18:13:24 and completed at 18:13:26. But the Job Id 2 started at 18:13:35 and there was 9 seconds time gap, about 25% of total execution time. 25% seems a lot to me. Job Id 1 executed the line 42 while Job Id 2 executed the line 44. There is no execution code at line 43. Things become interesting.

spark_event_timeline_1

Then I checked out Executors page. It shows there are two Executors and each took about 6~7 seconds tasks time. Then I click the link to stdout Logs for each executor. I paid more attention to the timeline between 18:13:24 and 18:13:35.

spark_executors_2

Here are the part of output from Executor 1:

16/12/02 18:13:14 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 27032@enkbda1node10.enkitec.com
16/12/02 18:13:14 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
16/12/02 18:13:16 INFO spark.SecurityManager: Changing view acls to: yarn,oracle
16/12/02 18:13:17 INFO spark.SecurityManager: Changing modify acls to: yarn,oracle
16/12/02 18:13:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, oracle); users with modify permissions: Set(yarn, oracle)
16/12/02 18:13:18 INFO spark.SecurityManager: Changing view acls to: yarn,oracle
16/12/02 18:13:18 INFO spark.SecurityManager: Changing modify acls to: yarn,oracle
16/12/02 18:13:18 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, oracle); users with modify permissions: Set(yarn, oracle)
16/12/02 18:13:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/12/02 18:13:18 INFO Remoting: Starting remoting
16/12/02 18:13:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutorActorSystem@enkbda1node10.enkitec.com:38184]
16/12/02 18:13:19 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutorActorSystem@enkbda1node10.enkitec.com:38184]
16/12/02 18:13:19 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 38184.
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-7af60bfd-9e27-45c8-bf30-a4bf126681f0
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u11/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-1c6099f9-37f2-4b4e-8b60-5c209bffc924
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u10/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-ec991001-9dc1-4017-ba55-314b54dd9109
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-c0df794d-5ee6-46fe-ad57-cf6186cd5ba7
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u08/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-673f1d0b-7e44-47e7-b36e-eed65c656c87
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u07/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-ab27950b-7dfd-48eb-a33c-7fbd02d29137
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u06/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-fe6697c4-d64a-47b3-9781-27d583370710
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u05/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-e0a928ab-5895-46b6-8b10-ad883e895632
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u04/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-4d39319c-b6be-4a17-8755-89477f81e899
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u03/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-cfd8fd9c-22cd-443f-8a1d-99b9867c8507
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u02/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-fff46796-d06a-45af-816b-c46d356be447
16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u01/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-6cf05120-f651-4615-8abe-14631c5aadb1
16/12/02 18:13:19 INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@192.168.12.105:33339
16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
16/12/02 18:13:19 INFO executor.Executor: Starting executor ID 2 on host enkbda1node10.enkitec.com
16/12/02 18:13:19 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45880.
16/12/02 18:13:19 INFO netty.NettyBlockTransferService: Server created on 45880
16/12/02 18:13:19 INFO storage.BlockManager: external shuffle service port = 7337
16/12/02 18:13:19 INFO storage.BlockManagerMaster: Trying to register BlockManager
16/12/02 18:13:19 INFO storage.BlockManagerMaster: Registered BlockManager
16/12/02 18:13:19 INFO storage.BlockManager: Registering executor with local external shuffle service.
16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1
16/12/02 18:13:19 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
16/12/02 18:13:20 INFO executor.Executor: Fetching spark://192.168.12.105:33339/jars/scalawztest1_2.10-1.0.jar with timestamp 1480723975104
16/12/02 18:13:20 INFO util.Utils: Fetching spark://192.168.12.105:33339/jars/scalawztest1_2.10-1.0.jar to /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a/fetchFileTemp8581640132534419761.tmp
16/12/02 18:13:20 INFO util.Utils: Copying /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a/8566654191480723975104_cache to /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/container_e81_1480516695248_0056_01_000003/./scalawztest1_2.10-1.0.jar
16/12/02 18:13:20 INFO executor.Executor: Adding file:/u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/container_e81_1480516695248_0056_01_000003/./scalawztest1_2.10-1.0.jar to class loader
16/12/02 18:13:20 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0
16/12/02 18:13:20 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.6 KB, free 25.6 KB)
16/12/02 18:13:20 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 101 ms
16/12/02 18:13:20 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 72.5 KB, free 98.1 KB)
16/12/02 18:13:22 INFO client.FusionCommon: Initialized FusionHdfs with URI: hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=1, FileSystem: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_468103070_110, ugi=oracle (auth:SIMPLE)]], instance: 426700950
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=1
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=10
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=11
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=12
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=13
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=14
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=15
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=16
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=17
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=18
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=19
16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=2
....
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8
16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9
16/12/02 18:13:24 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 727069 bytes result sent to driver
16/12/02 18:13:24 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 3
16/12/02 18:13:24 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3)
16/12/02 18:13:24 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 25.5 KB, free 123.6 KB)
16/12/02 18:13:24 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 24 ms
16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 72.4 KB, free 196.0 KB)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-hadoop-bundle-1.5.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-format-2.1.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-pig-bundle-1.5.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/hive-exec-1.1.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/hive-jdbc-1.1.0-cdh5.7.0-standalone.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [shaded.parquet.org.slf4j.helpers.NOPLoggerFactory]
16/12/02 18:13:26 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1503 bytes result sent to driver
16/12/02 18:13:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4
16/12/02 18:13:35 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 4)
16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 3
16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 93.1 KB, free 93.1 KB)
16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 3 took 9 ms
16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 684.5 KB, free 777.6 KB)
16/12/02 18:13:35 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0 start: 0 end: 2028037 length: 2028037 hosts: []}
16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 2
16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 801.6 KB)
16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 2 took 7 ms
16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 334.5 KB, free 1136.1 KB)
16/12/02 18:13:35 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
16/12/02 18:13:36 INFO codegen.GenerateUnsafeProjection: Code generated in 148.139655 ms
16/12/02 18:13:36 INFO codegen.GenerateUnsafeProjection: Code generated in 21.996524 ms
16/12/02 18:13:36 INFO codegen.GenerateSafeProjection: Code generated in 15.975923 ms
16/12/02 18:13:36 WARN parquet.CorruptStatistics: Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297
16/12/02 18:13:36 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 4). 5365 bytes result sent to driver
16/12/02 18:13:40 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown
16/12/02 18:13:40 WARN executor.CoarseGrainedExecutorBackend: An unknown (enkbda1node05.enkitec.com:33339) driver disconnected.
16/12/02 18:13:40 ERROR executor.CoarseGrainedExecutorBackend: Driver 192.168.12.105:33339 disassociated! Shutting down.
16/12/02 18:13:40 INFO storage.DiskBlockManager: Shutdown hook called
16/12/02 18:13:40 INFO util.ShutdownHookManager: Shutdown hook called
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u04/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-47de642a-8665-4853-a4f7-a5ba3ece4295
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u03/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-f1aaa12c-c0d5-4c75-b1e6-e222ea9112b6
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u01/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-764ee07d-b082-4bc1-8b4d-3834d6cd14cd
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u02/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-f79cf620-4754-41e2-bb4c-6e242f8e16ad
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u11/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-ae63ef76-400a-4e8f-b68b-3a34ed25414e
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-37543a1b-c5ec-4f06-887d-9357bea573e4
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u05/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-9a5eb5c9-4009-49ff-aca6-64103429b245
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u07/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-6ac13e05-62ec-485f-8016-3bb4d1830492
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u10/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-15e0aa40-4a67-46c7-8eb0-fe8c8f2d0c6e
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u06/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-112c8eb2-e039-4f73-beae-e3c05a04c3c6
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u08/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-752c1bcc-ce1f-41a1-b779-347f77b04227
16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a

Executor 1 handled 840 Hive partitions. There are no additional logging information between 18:13:26 and 18:13:35. The log immediately jumped from 16/12/02 18:13:26 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1503 bytes result sent to driver to line 16/12/02 18:13:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4.

Executor 2 has similar behavior and processed exactly 840 partitions as well. The interested logs are shown below:
16/12/02 18:13:25 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 931 bytes result sent to driver
16/12/02 18:13:38 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5

It did not tell me anything useful. Ok, let me check out the Stages page. I am more interested in Stage Id 2 and Stage Id 3. Here is the screen for Stages summary.
spark_stage_summary_3

Let’s check out Stage Id 1 screen shown below.
spark_stage_stage1_4

The duration is only between 0.2 and 1 seconds for the two executors. Another interesting statistics is the Peak Execution Memory is 0 Byte for both. I don’t believe this stage can load 1.3 billion rows of data without any memory usage. In other words, I believe it does not do any IO related work at this stage although the stage is doing select * from wzdb.sales_part.

Ok, let me check out the next stage, Stage 2. The DAG chart is so huge that it takes 20 seconds to shows up on the screen. There are literally 3,365 RDD partitions with union operation together to provide the result for show() function.

spark_stage_stage2_5

spark_stage_stage2_6

The Metrics stats for this stage gives other interesting result.
spark_stage_stage2_7

The total duration is unbelievable fast, 1 second and input size of 1980KB and 21 records. Remember, by default, show() function just print out 20 rows. So this 1980KB and 21 records are definitely related to this 20 rows show() result. But with 3,365 RDD partitions are union together, 1 second seems unbelievable fast. Please note the block size is 256 MB in our environment. I just don’t believe it’s physically possible to perform stage 1 operation (select * from wzdb.sales_part with 1.3 billion rows Hive Piquet table) in 1 second and immediately show 20 rows of result in the following 1 second. Yes, Spark is in memory based processing and super fast. But from the DAG, it go through all 1.3 billion rows. It can’t be finished in 2 seconds, even with a full rack BDA. It must has something else not present in the picture.

Luckily, for this test, I used the client mode as the deployment mode. So all of the log output was sent to my driver, the executing session. Then I found out where the missing time goes.

16/12/02 18:13:23 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4178 ms on enkbda1node09.enkitec.com (1/2)
16/12/02 18:13:24 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4458 ms on enkbda1node10.enkitec.com (2/2)
16/12/02 18:13:24 INFO scheduler.DAGScheduler: ResultStage 0 (sql at test2.scala:42) finished in 20.648 s
16/12/02 18:13:24 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/12/02 18:13:24 INFO scheduler.DAGScheduler: Job 0 finished: sql at test2.scala:42, took 21.026555 s
16/12/02 18:13:24 INFO spark.SparkContext: Starting job: sql at test2.scala:42
16/12/02 18:13:24 INFO scheduler.DAGScheduler: Got job 1 (sql at test2.scala:42) with 2 output partitions
16/12/02 18:13:24 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (sql at test2.scala:42)
16/12/02 18:13:24 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/12/02 18:13:24 INFO scheduler.DAGScheduler: Missing parents: List()
16/12/02 18:13:24 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sql at test2.scala:42), which has no missing parents
16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 72.4 KB, free 170.5 KB)
16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 25.5 KB, free 196.0 KB)
16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.12.105:14015 (size: 25.5 KB, free: 530.0 MB)
16/12/02 18:13:24 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/12/02 18:13:24 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sql at test2.scala:42)
16/12/02 18:13:24 INFO cluster.YarnScheduler: Adding task set 1.0 with 2 tasks
16/12/02 18:13:24 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, enkbda1node09.enkitec.com, partition 0,PROCESS_LOCAL, 2044 bytes)
16/12/02 18:13:24 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, enkbda1node10.enkitec.com, partition 1,PROCESS_LOCAL, 2143 bytes)
16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on enkbda1node09.enkitec.com:19013 (size: 25.5 KB, free: 530.0 MB)
16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 25.5 KB, free: 530.0 MB)
16/12/02 18:13:25 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 294 ms on enkbda1node09.enkitec.com (1/2)
16/12/02 18:13:26 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 1614 ms on enkbda1node10.enkitec.com (2/2)
16/12/02 18:13:26 INFO scheduler.DAGScheduler: ResultStage 1 (sql at test2.scala:42) finished in 1.620 s
16/12/02 18:13:26 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/12/02 18:13:26 INFO scheduler.DAGScheduler: Job 1 finished: sql at test2.scala:42, took 1.665575 s
16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.12.105:14015 in memory (size: 25.5 KB, free: 530.0 MB)
16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 25.5 KB, free: 530.0 MB)
16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node09.enkitec.com:19013 in memory (size: 25.5 KB, free: 530.0 MB)
16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.12.105:14015 in memory (size: 25.6 KB, free: 530.0 MB)
16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 25.6 KB, free: 530.0 MB)
16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node09.enkitec.com:19013 in memory (size: 25.6 KB, free: 530.0 MB)
16/12/02 18:13:26 INFO spark.ContextCleaner: Cleaned accumulator 1
16/12/02 18:13:26 INFO spark.ContextCleaner: Cleaned accumulator 2
16/12/02 18:13:26 INFO datasources.DataSourceStrategy: Selected 1680 partitions out of 1680, pruned 0.0% partitions.
16/12/02 18:13:26 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 108.4 KB, free 108.4 KB)
16/12/02 18:13:26 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 132.3 KB)
16/12/02 18:13:26 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.12.105:14015 (size: 24.0 KB, free: 530.0 MB)
16/12/02 18:13:26 INFO spark.SparkContext: Created broadcast 2 from show at test2.scala:44
16/12/02 18:13:27 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000128_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000128_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000284_0
16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000137_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000137_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000293_0
16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000138_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000138_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000294_0
16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000139_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000139_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000295_0
16/12/02 18:13:28 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000140_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000140_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000296_0

....
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000144_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000144_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000266_0
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000127_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000127_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000249_0
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000145_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000145_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000267_0
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000146_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000146_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000268_0
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000147_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000147_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000269_0
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000148_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000148_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000270_0
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000149_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000149_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000271_0
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000150_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000150_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000272_0
16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000151_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000151_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000273_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000152_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000152_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000274_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000153_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000153_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000275_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000128_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000128_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000250_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000129_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000129_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000251_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000130_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000130_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000252_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000131_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000131_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000253_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000132_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000132_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000254_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000133_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000133_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000255_0
16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000134_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000134_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000256_0
16/12/02 18:13:35 INFO spark.SparkContext: Starting job: show at test2.scala:44
16/12/02 18:13:35 INFO scheduler.DAGScheduler: Got job 2 (show at test2.scala:44) with 1 output partitions
16/12/02 18:13:35 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (show at test2.scala:44)
16/12/02 18:13:35 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/12/02 18:13:35 INFO scheduler.DAGScheduler: Missing parents: List()
16/12/02 18:13:35 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3367] at show at test2.scala:44), which has no missing parents
16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 684.5 KB, free 816.9 KB)
16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 93.1 KB, free 909.9 KB)
16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.12.105:14015 (size: 93.1 KB, free: 529.9 MB)
16/12/02 18:13:35 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
16/12/02 18:13:35 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[3367] at show at test2.scala:44)
16/12/02 18:13:35 INFO cluster.YarnScheduler: Adding task set 2.0 with 1 tasks
16/12/02 18:13:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, enkbda1node10.enkitec.com, partition 0,RACK_LOCAL, 2384 bytes)
16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 93.1 KB, free: 529.9 MB)
16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 24.0 KB, free: 529.9 MB)
16/12/02 18:13:36 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 1336 ms on enkbda1node10.enkitec.com (1/1)
16/12/02 18:13:36 INFO scheduler.DAGScheduler: ResultStage 2 (show at test2.scala:44) finished in 1.336 s
16/12/02 18:13:36 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 
16/12/02 18:13:36 INFO scheduler.DAGScheduler: Job 2 finished: show at test2.scala:44, took 1.604959 s
16/12/02 18:13:36 INFO spark.ContextCleaner: Cleaned accumulator 3
16/12/02 18:13:36 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.12.105:14015 in memory (size: 93.1 KB, free: 530.0 MB)
16/12/02 18:13:36 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 93.1 KB, free: 530.0 MB)
+-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
|      tran_id|  tran_timestamp|store_id|           item_name|item_category|dept_id|       dept_name|tran_price|cashier_name|year|month|day|
+-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
|1479564513475|2012-01-01 15:47|       4|rxkexrwwrnohuenpm...|          891|     26|             Toy|    185.17|       Maria|2012|    1|  1|
|1479564513608|2012-01-01 10:11|      27|                  zz|          790|     26|   Auto and Tire|     68.55|         Von|2012|    1|  1|
|1479564513748|2012-01-01 16:53|      26|fqzlqxvmpktwjwwgg...|          279|     10|Home Improvement|    233.47|         Von|2012|    1|  1|
|1479564513750|2012-01-01 21:10|      22|               ndmeu|          487|     35|           Photo|     92.42|       Ileen|2012|    1|  1|
|1479564526973|2012-01-01 07:52|       6|sbzmvrnxrvbohorbp...|          632|     18|         Jewelry|    164.34|        Keri|2012|    1|  1|
|1479564469852|2012-01-01 18:54|      27|ptcilplqfvednxmmh...|          416|      3|    Baby Toddler|    144.86|      Michel|2012|    1|  1|
|1479564523772|2012-01-01 11:07|       2|gvjrsdgidzunbbmfi...|          269|     17|          Sports|    231.67|       Penny|2012|    1|  1|
|1479564524666|2012-01-01 08:51|       6|rfskpcezchhbhzsbd...|          595|     19|            Food|    175.85|         Rus|2012|    1|  1|
|1479564470133|2012-01-01 18:36|      17|wzswebdjowfjjbslh...|          679|     10|   Health Beauty|    350.13|        Keri|2012|    1|  1|
|1479564537634|2012-01-01 07:52|      12|             bhxoevw|          281|     34|    Baby Toddler|    352.02|        Keri|2012|    1|  1|
|1479564470197|2012-01-01 06:04|       5|plqxmnrcuqisfygkl...|          152|     19|             Toy|     53.67|     Dorothy|2012|    1|  1|
|1479564470201|2012-01-01 08:23|      13|frcatrjwwrbomxmnj...|           74|     20|   Auto and Tire|    359.81|       Ileen|2012|    1|  1|
|1479564470386|2012-01-01 10:16|      15|cevezkxpsrzszshen...|          814|     13|   Auto and Tire|     27.92|     Sherril|2012|    1|  1|
|1479564470724|2012-01-01 01:44|      26|jjiqfklffyzxzkyiz...|          248|      5|   Auto and Tire|    219.66|       Lidia|2012|    1|  1|
|1479564470799|2012-01-01 23:26|      18|     voakgmajahxfgbq|          769|     17|          Sports|    251.07|       Susan|2012|    1|  1|
|1479564470941|2012-01-01 13:28|      14|axkytaxghyujudtaw...|          207|      5|   Auto and Tire|    168.34|  Christoper|2012|    1|  1|
|1479564471016|2012-01-01 15:37|       3|sdcnxhosatucnwwqk...|          192|     23|         Jewelry|       2.5|      Michel|2012|    1|  1|
|1479564471049|2012-01-01 23:27|       9|zoppybkpqpgitrwlo...|          120|     32|          Sports|    147.28|     Dorothy|2012|    1|  1|
|1479564471063|2012-01-01 23:51|      24|zknmvfsrsdxdysmdw...|          169|      6|         Jewelry|    292.59|   Broderick|2012|    1|  1|
|1479564471113|2012-01-01 19:42|      20|uaqmjikgtisidskzm...|          388|     36|            Food|      3.55|       Maria|2012|    1|  1|
+-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
only showing top 20 rows

16/12/02 18:13:36 INFO datasources.DataSourceStrategy: Selected 1680 partitions out of 1680, pruned 0.0% partitions.
16/12/02 18:13:36 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 282.1 KB, free 414.5 KB)

The above log shows that IO operations like parquet.ParquetRelation: Reading Parquet file(s) are completely outside the timeline for Job(/stage) 1 and Job 2. This is where the missing time goes. It is actually pretty good to have only 9~10 seconds to go through the all 1.3 billion rows. Mystery is solved.

With the above findings in mind, I feel if I just do partition pruning and limit the number of rows scanned in the line 42 query, the gap timeline should be reduced as less IO is needed to read the data. So I add the partition pruning in the query on line 42 to select * from wzdb.sales_part where year=2013 and month=11 and day=13. Rerun the test. The result was exactly what I expected.

Here is the new timeline:
spark_event_timeline_8

As you can see, there is only 1 second gap between Job Id 1 and Job Id 2. Here are the execution log. Only one partition of data was read.

16/12/03 11:11:10 INFO scheduler.DAGScheduler: Job 1 finished: sql at test2.scala:42, took 1.322394 s
16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.12.105:46755 in memory (size: 25.5 KB, free: 530.0 MB)
16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node09.enkitec.com:37048 in memory (size: 25.5 KB, free: 530.0 MB)
16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node03.enkitec.com:19685 in memory (size: 25.5 KB, free: 530.0 MB)
16/12/03 11:11:10 INFO spark.ContextCleaner: Cleaned accumulator 2
16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.12.105:46755 in memory (size: 25.6 KB, free: 530.0 MB)
16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node09.enkitec.com:37048 in memory (size: 25.6 KB, free: 530.0 MB)
16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node03.enkitec.com:19685 in memory (size: 25.6 KB, free: 530.0 MB)
16/12/03 11:11:10 INFO spark.ContextCleaner: Cleaned accumulator 1
16/12/03 11:11:11 INFO datasources.DataSourceStrategy: Selected 1 partitions out of 1680, pruned 99.94047619047619% partitions.
16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 108.4 KB, free 108.4 KB)
16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 132.3 KB)
16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.12.105:46755 (size: 24.0 KB, free: 530.0 MB)
16/12/03 11:11:11 INFO spark.SparkContext: Created broadcast 2 from show at test2.scala:44
16/12/03 11:11:11 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
16/12/03 11:11:11 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000165_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000165_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000191_0
16/12/03 11:11:11 INFO spark.SparkContext: Starting job: show at test2.scala:44
16/12/03 11:11:11 INFO scheduler.DAGScheduler: Got job 2 (show at test2.scala:44) with 1 output partitions
16/12/03 11:11:11 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (show at test2.scala:44)
16/12/03 11:11:11 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/12/03 11:11:11 INFO scheduler.DAGScheduler: Missing parents: List()
16/12/03 11:11:11 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[9] at show at test2.scala:44), which has no missing parents
16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 143.0 KB, free 275.3 KB)
16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 16.5 KB, free 291.8 KB)
16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.12.105:46755 (size: 16.5 KB, free: 530.0 MB)
16/12/03 11:11:11 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
16/12/03 11:11:11 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[9] at show at test2.scala:44)
16/12/03 11:11:11 INFO cluster.YarnScheduler: Adding task set 2.0 with 1 tasks
16/12/03 11:11:11 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, enkbda1node09.enkitec.com, partition 0,NODE_LOCAL, 2386 bytes)
16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on enkbda1node09.enkitec.com:37048 (size: 16.5 KB, free: 530.0 MB)
16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on enkbda1node09.enkitec.com:37048 (size: 24.0 KB, free: 530.0 MB)
16/12/03 11:11:12 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 1523 ms on enkbda1node09.enkitec.com (1/1)

Lesson learned from this test is that Spark Metrics is helpful to identify the bottleneck of the spark application, but may not tell you the complete story. Just like in this case, if we just focus on the 1 or 2 seconds operations, it seems nothing need to be tuned here. On the contrary, we should need to focus on reducing the IO to access 1+ billion rows table by adding filter of partition keys and limiting total number of rows scan.

Use Toad to Access Hadoop Ecosystem

Sometime back I wrote a blog about Use SQL Developer to Access Hive Table on Hadoop. Recently I noticed another similar product: Toad for Hadoop. So I decided to give a try.
Like many people, I like Toad products in general and use Toad in many of my projects. Toad for Hadoop is a new product in the Toad family. The current version is Toad for Hadoop 1.3.1 Beta on Windows platform only. The software supports Cloudera CDH 5.x and Hortonworks Data Platform 2.3. The software is free for now. But You need to create an account with Dell before you can download the zip file. The entire process of installation and configuration are pretty simple and straight forward. Here are the steps:

Download the zip files
Go to Toad for Hadoop. Click Download button. The zip file is 555 MB in size.

Installation
I installed the software in my Window VM. Just double click ToadHadoop_1.3.1_Beta_x64.exe file and take the default values for all of installation screens. At the end of installation, it will open the software automatically.

Configuration
Unlike so many buttons in the regular Toad software, this one looks quite simple.
Toad_config_1
Click the dropdown box on the right of Ecosystem box, then click Add New Ecosystem. The Select your Hadoop Cluster setup screen shows up as follows.
Toad_config_2
Input the name you want for this connection. For this one, I configured the connection for our X3 Big Data Appliance (BDA) full rack cluster with 18 nodes. So I input the Name as Enk-X3-DBA. For Detection Method, you can see it support Cloudera CDH via Cloudera Manager or Hortonworks HDP via Ambari. For this one, I chose CDH managed by Cloudera Manager for Detection Method.

Next screen is to Enter your Cloudera Manager credentials. Use the same url and port number that you access your Cludera Manager for Server Address. The user name is the user name you access Cludera Manager. Make sure you create your user directory on HDFS before you run the installation of the software, for example, create a folder /user/zhouw and change the permission to zhouw user for read/write access. Otherwise you will see permission exception later on.
Toad_config_3
Next screen shows Autodetection. It does many checks and validations and you should see the successful status for all of them.
Toad_config_4
Next one shows Ecosystem Configuration. In this screen, I just input zhouw for User Name. Then click Activate button. There is a bug in this version. Sometimes both Activate and Cancel buttons disappear. The workaround is just to close and restart the software.
Toad_config_5

SQL Screen
The most frequently used screen is SQL Screen. You can run the SQLs against either Hive or Impala engine.
Toad_SQL_1
The screen is very similar to traditional Toad screen I use to. On the left panel, it shows the schemas and table names. The bottom panel shows the result. Although it has Explain Plan tab in the result panel, I usually consider Explain Plan on Hadoop as a joke at this time of writing. You can take a look, but I would not waste the time in checking out the plan. You will see more issues from other parts of Hadoop world instead of suboptimal query plans. The History panel on the right is an interesting one, which I found it very useful later on. It is not only shows the timing for my queries (or jobs), but also cache the result from the previous runs. It proves a smart feature and I don’t have to rerun my queries to get the result back.
Toad_SQL_2
Sometimes you might want to check out DDLs for certain tables. You can just right click the table and select Generate Create Table statement as follows:
Toad_Generate_table_1
Here is an example of generated DDL.
Toad_Generate_table_2

HDFS Screen
HDFS Screen is another feature I really like. It works just like Window Explorer and shows HDFS directory and files under it in tree structure. It also shows the size information for directory and files. With just a few clicks, you can quickly find out which directories and files are taking a lot of space. On the right panel, it can show you some content of the files. By default, it shows the first 4K of data. Very convenient and save me the time in typing multiple commands to find out the same kind of information. If you want to download and upload files from/to HDFS, just click Download and Upload buttons on the top.
Toad_HDFS_1
Sometimes I am interested in the replication factor and physical locations of certain files on HDFS. Just right click the file on HDFS, then select Properties.
Toad_File_Properties
It shows everything about this file. For sizing information, it shows both Summary and individual block information.
Toad_File_Property_1

Chart Screen
The Chart Screen also looks nice. It does not have many charts in Cloudera Manager, but does have the necessary key information I usually want to know. I just list a few of them as follows:
Toad_Chart_1

Toad_Chart_3

Log Screen
The Log Screen is an ok one. Here are some of them:
Toad_Log_1

Toad_Log_2

Transfer Screen
The Transfer Screen is supposed to support the data to/from RDBMS, like Oracle, MySQL and SQL Server. I haven’t really tried out this one.
Toad_Transfer_1

Service Screen
The Service Screen is useful when you want to know where you deploy your services on Hadoop, like hostname and port number for certain services. It does not have everything, but good enough.
Toad_Service_1

Toad_Service_2

In general, Toad for Hadoop is a nice tool that can help you to quickly find out certain information on Hadoop without going through many screens and commands. I would say this tools is for Hadoop Administrators instead of regular Hadoop user. The reason is that you probably don’t want to give the Cloudera Manager access for every user.

Use incremental import in sqoop to load data from Oracle (Part II)

The last post discussed the first part of creating incremental import job in sqoop. This post will continue the discussion for the
2nd part of the of the series.

In the last few posts, I discussed the following:
1. Install Cloudera Hadoop Cluster using Cloudera Manager
2. Configurations after CDH Installation
3. Load Data to Hive Table.
4. Import Data to Hive from Oracle Database
5. Export data from Hive table to Oracle Database.
6. Use Impala to query a Hive table
7. Use incremental import in sqoop to load data from Oracle (Part I)

Although we can pass many arguments to execute the sqoop job just what I did in the last post, there is a better way to manage this kind of work by using sqoop job and can do the similar work as showing the picture below.
sqoop_incremental_2

First, let me create a sqoop job using the following command:
sqoop job \
–create student_job \
— import \
–connect jdbc:oracle:thin:@enkx3-scan:1521:dbm2 \
–username wzhou \
–password wzhou \
–table STUDENT \
–incremental append \
–check-column student_id \
–last-value 7 \
-m 4 \
–split-by major

Once the sqoop job is created, I can use the following commands to show, execute or delete a sqoop job.
sqoop job –list
sqoop job –show student_job
sqoop job –exec student_job
sqoop job –delete student_job

Here are the execution results:
[wzhou@vmhost1 ~]$ sqoop job \
> –create student_job \
> — import \
> –connect jdbc:oracle:thin:@enkx3-scan:1521:dbm2 \
> –username wzhou \
> –password wzhou \
> –table STUDENT \
> –incremental append \
> –check-column student_id \
> –last-value 7 \
> -m 4 \
> –split-by major

Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/25 13:57:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
15/09/25 13:57:35 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.

[wzhou@vmhost1 ~]$ sqoop job –list
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/25 13:57:47 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
Available jobs:
student_job

[wzhou@vmhost1 ~]$ sqoop job –show student_job
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/25 13:58:07 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
Enter password:
Job: student_job
Tool: import
Options:
—————————-
verbose = false
incremental.last.value = 7
db.connect.string = jdbc:oracle:thin:@enkx3-scan:1521:dbm2
codegen.output.delimiters.escape = 0
codegen.output.delimiters.enclose.required = false
codegen.input.delimiters.field = 0
hbase.create.table = false
db.require.password = true
hdfs.append.dir = true
db.table = STUDENT
codegen.input.delimiters.escape = 0
import.fetch.size = null
accumulo.create.table = false
codegen.input.delimiters.enclose.required = false
db.username = wzhou
codegen.output.delimiters.record = 10
import.max.inline.lob.size = 16777216
hbase.bulk.load.enabled = false
hcatalog.create.table = false
db.clear.staging.table = false
incremental.col = student_id
codegen.input.delimiters.record = 0
enable.compression = false
hive.overwrite.table = false
hive.import = false
codegen.input.delimiters.enclose = 0
accumulo.batch.size = 10240000
hive.drop.delims = false
codegen.output.delimiters.enclose = 0
hdfs.delete-target.dir = false
codegen.output.dir = .
codegen.auto.compile.dir = true
relaxed.isolation = false
mapreduce.num.mappers = 4
accumulo.max.latency = 5000
import.direct.split.size = 0
codegen.output.delimiters.field = 44
export.new.update = UpdateOnly
incremental.mode = AppendRows
hdfs.file.format = TextFile
codegen.compile.dir = /tmp/sqoop-wzhou/compile/5c52e95144738b92047bd2e1e37d1b1f
direct.import = false
db.split.column = major
hive.fail.table.exists = false
db.batch = false

[wzhou@vmhost1 ~]$ sqoop job –exec student_job
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/25 14:05:19 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
Enter password:
15/09/25 14:05:22 INFO oracle.OraOopManagerFactory: Data Connector for Oracle and Hadoop is disabled.
15/09/25 14:05:22 INFO manager.SqlManager: Using default fetchSize of 1000
15/09/25 14:05:22 INFO tool.CodeGenTool: Beginning code generation
15/09/25 14:05:23 INFO manager.OracleManager: Time zone has been set to GMT
15/09/25 14:05:23 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM STUDENT t WHERE 1=0
15/09/25 14:05:23 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-wzhou/compile/67a12da72d209152325de07df2841b68/STUDENT.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
15/09/25 14:05:25 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-wzhou/compile/67a12da72d209152325de07df2841b68/STUDENT.jar
15/09/25 14:05:26 INFO manager.OracleManager: Time zone has been set to GMT
15/09/25 14:05:26 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(student_id) FROM STUDENT
15/09/25 14:05:26 INFO tool.ImportTool: Incremental import based on column student_id
15/09/25 14:05:26 INFO tool.ImportTool: No new rows detected since last import.

As expected, no rows were inserted. So let me insert a few more rows and rerun the jobs.
Run the following SQL commands to insert the rows on Oracle database.
insert into wzhou.student values ( 8, ‘student8’, ‘history’);
insert into wzhou.student values ( 9, ‘student9’, ‘math’);
insert into wzhou.student values ( 10, ‘student10’, ‘computer’ );
commit;

Rerun the job and it shows 3 rows inserted.
[wzhou@vmhost1 ~]$ sqoop job –exec student_job
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
15/09/25 14:08:49 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.3
Enter password:
15/09/25 14:08:53 INFO oracle.OraOopManagerFactory: Data Connector for Oracle and Hadoop is disabled.
15/09/25 14:08:53 INFO manager.SqlManager: Using default fetchSize of 1000
15/09/25 14:08:53 INFO tool.CodeGenTool: Beginning code generation
15/09/25 14:08:53 INFO manager.OracleManager: Time zone has been set to GMT
15/09/25 14:08:53 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM STUDENT t WHERE 1=0
15/09/25 14:08:53 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-wzhou/compile/b4ec871b912ecfebe6b03a510f000581/STUDENT.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
15/09/25 14:08:55 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-wzhou/compile/b4ec871b912ecfebe6b03a510f000581/STUDENT.jar
15/09/25 14:08:56 INFO manager.OracleManager: Time zone has been set to GMT
15/09/25 14:08:56 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(student_id) FROM STUDENT
15/09/25 14:08:56 INFO tool.ImportTool: Incremental import based on column student_id
15/09/25 14:08:56 INFO tool.ImportTool: Lower bound value: 7
15/09/25 14:08:56 INFO tool.ImportTool: Upper bound value: 10
15/09/25 14:08:56 INFO mapreduce.ImportJobBase: Beginning import of STUDENT
15/09/25 14:08:56 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
15/09/25 14:08:56 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
15/09/25 14:08:56 INFO client.RMProxy: Connecting to ResourceManager at vmhost1.local/192.168.56.71:8032
15/09/25 14:08:59 INFO db.DBInputFormat: Using read commited transaction isolation
15/09/25 14:08:59 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(major), MAX(major) FROM STUDENT WHERE ( student_id > 7 AND student_id use test_oracle;
OK
Time taken: 1.548 seconds

hive> select * from student_ext;
OK
2 student2 computer
4 student4 accounting
1 student1 math
3 student3 math
5 student3 computer
7 student5 computer
6 student4 math
10 student10 computer
8 student8 history
9 student9 math

Time taken: 0.694 seconds, Fetched: 10 row(s)

We can see the new rows with student8, 9 and 10.