Resolve Sparklyr not Respond Issue on Port 8880

Recently I was approached by one of my clients to help them to investigate a weird Sparklyr issue. sparklyr is an interface between R and Spark introduced by RStudio about a years ago. The following is the the sparklyr architecture.

When trying to do sc <- spark_connect in RStudio, we got two errors as follows:

  • Failed while connecting to sparklyr to port (8880) for sessionid (3859): Gateway in port (8880) did not respond.
  • Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
  • Here is the detail message.

    > library(sparklyr)
    > library(dplyr) 
    > sc <- spark_connect(master = "yarn-client", config=spark_config(), version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
     
    Error in force(code) :
    Failed while connecting to sparklyr to port (8880) for sessionid (3859): Gateway in port (8880) did not respond.
    Path: /opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/lib/spark/bin/spark-submit
    Parameters: --class, sparklyr.Shell, --jars, '/usr/lib64/R/library/sparklyr/java/spark-csv_2.11-1.3.0.jar','/usr/lib64/R/library/sparklyr/java/commons-csv-1.1.jar','/usr/lib64/R/library/sparklyr/java/univocity-parsers-1.5.1.jar', '/usr/lib64/R/library/sparklyr/java/sparklyr-1.6-2.10.jar', 8880, 3859
    Log: /tmp/RtmzpSIMln/file9e23246605df7_spark.log
     
    ....
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
                   at org.apache.spark.deploy.SparkSubmitArguments.handle(SparkSubmitArguments.scala:394)
                   at org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:163)
                   at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:97)
                   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:114)
                   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream
                   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
                   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
                   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                   ... 5 more
    

    Did some research and found many people having the similar issue. Ok, try their recommendations one by one as follows.

  • Set SPARK_HOME environment
  • Try run Sys.setEnv(SPARK_HOME = “/opt/cloudera/parcels/CDH/lib/spark/”). No, not working.

  • Install latest version sparklyr
  • My client installed sparklyr less than one month ago. I don’t see why this option makes sense. Don’t even pursue this path.

  • Check Java Installation
  • The R on the same server uses the same version of Java without any issue. I don’t see why Java installation become a major concern here. Ignore this one.

  • No Hadoop Installation
  • Someone said just Spark installation is not enough, not to have Hadoop Installation as well. Clearly it does not fit our situation. The server is an edge node and has hadoop installation.

  • Do not have a valid kerberos ticket
  • Running system2(‘klist’) does show no kerberos ticket. Ok, I then open up a shell within RStudio Server by clicking tools -> shell, then issuing the kinit command.
    Rerun system2(‘klist’) shows I have a valid kerberos ticket. Try again. still not working.
    Note: even it is not working, this step is necessary for further action when the issue is fixed. So still need to run this one no matter what the result is.

  • Create a different configure and pass to spark_connect
  • Someone recommended to create a new configure and pass it in. It looks like a good idea. Unfortunately, just doesn’t work.

    wzconfig <- spark_config()
    wzconfig$`sparklyr.shell.deploy-mode` <- "client"
    wzconfig$spark.driver.cores <- 1
    wzconfig$spark.executor.cores <- 2
    wzconfig$spark.executor.memory <- "4G"
    sc <- spark_connect(master = "yarn-client", config=wzconfig, version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
    

    Actually this recommendation is missing another key parameter. By default the total number of executors launched is 2. I would usually bump up this number a little to get a better performance. You can use the following way to set up the
    total number of executors.

    wzconfig$spark.executor.instances <- 3
    

    Although this approach looks promising, still not working. But this approach is definitely a way to use for other purpose to better control the Spark resource usage.

  • Add remote address
  • Someone mentioned to set remote address. I thought this could another potential option as I resolved issues in Spark related to local IP issue in the past. So I add the following code in the configuration from the previous example, note parameter sparklyr.gateway.address is the hostname of active Resource Manager.

    wzconfig$sparklyr.gateway.remote <- TRUE
    wzconfig$sparklyr.gateway.address <- "cdhcluster01n03.mycompany.com" 
    

    Not working for this case.

  • Change deployment mode to yarn-cluster
  • This is probably the most unrealistic one. If connect as with master = “yarn-cluster”, the spark driver will be somewhere inside the Spark cluster. For our current case, I don’t believe this is the right solution. Don’t even try it.

  • Run Spark example
  • Someone recommended to run a spark-submit to verify SparkPi can be run from the environment. This looks reasonable. The good thing I figured out the issue before executing this one. But this definitely a valid and good test to verify spark-submit.

    /opt/cloudera/parcels/SPARK2/lib/spark2/bin/spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn /opt/cloudera/parcels/SPARK2/lib/spark2/examples/jars/spark-examples_2.11-2.1.0.jar 10
    
  • HA for yarn-cluster
  • There is an interesting post Add support for `yarn-cluster` with high availability #905 discussing about the issue might relate to multiple resource managers. We use HA and this post is an interesting one. But might not fit into our case because I feel we have not reached to the HA part yet with Class Not Found message.

  • Need to set JAVA_HOME
  • Verified it and we have it. So this is not the issue.

  • My Solution
  • After reviewing or trying out some of above solutions, I like to go back my way of thinking. I must say I am not an expert in R or RStudio with very limited knowledge about how it works. But I did have extensive background in Spark tuning and trouble shooting.

    I know the error message Gateway in port (8880) did not respond is always the first message shows up and looks like the cause of the issue. But I thought differently. I believe the 2nd error NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream looks more suspicious than the first one. Early this year I helped one of another clients on a weird Spark job issue, which is in the end, was caused by the incorrect path. It seems to me the path might not be right and cause Spark issue, then caused the first error of port not respond.

    With this idea in mind, I focused more the path verification. Run the command Sys.getenv() to get the environment as follows.

    > Sys.getenv()
    DISPLAY                 :0
    EDITOR                  vi
    GIT_ASKPASS             rpostback-askpass
    HADOOP_CONF_DIR         /etc/hadoop/conf.cloudera.hdfs
    HADOOP_HOME             /opt/cloudera/parcels/CDH
    HOME                    /home/wzhou
    JAVA_HOME               /usr/java/jdk1.8.0_144/jre
    LANG                    en_US.UTF-8
    LD_LIBRARY_PATH         /usr/lib64/R/lib::/lib:/usr/java/jdk1.8.0_92/jre/lib/amd64/server
    LN_S                    ln -s
    LOGNAME                 wzhou
    MAKE                    make
    PAGER                   /usr/bin/less
    PATH                    /usr/local/sbin:/usr/local/bin:/usr/bin:/usr/sbin:/sbin:/bin
    R_BROWSER               /usr/bin/xdg-open
    R_BZIPCMD               /usr/bin/bzip2
    R_DOC_DIR               /usr/share/doc/R-3.3.2
    R_GZIPCMD               /usr/bin/gzip
    R_HOME                  /usr/lib64/R
    R_INCLUDE_DIR           /usr/include/R
    R_LIBS_SITE             /usr/local/lib/R/site-library:/usr/local/lib/R/library:/usr/lib64/R/library:/usr/share/R/library
    R_LIBS_USER             ~/R/x86_64-redhat-linux-gnu-library/3.3
    R_PAPERSIZE             a4
    R_PDFVIEWER             /usr/bin/xdg-open
    R_PLATFORM              x86_64-redhat-linux-gnu
    R_PRINTCMD              lpr
    R_RD4PDF                times,hyper
    R_SESSION_TMPDIR        /tmp/RtmpZf9YMN
    R_SHARE_DIR             /usr/share/R
    R_SYSTEM_ABI            linux,gcc,gxx,gfortran,?
    R_TEXI2DVICMD           /usr/bin/texi2dvi
    R_UNZIPCMD              /usr/bin/unzip
    R_ZIPCMD                
    RMARKDOWN_MATHJAX_PATH
    						/usr/lib/rstudio-server/resources/mathjax-26
    RS_RPOSTBACK_PATH       /usr/lib/rstudio-server/bin/rpostback
    RSTUDIO                 1
    RSTUDIO_HTTP_REFERER    http://hadoop-edge06.mycompany.com:8787/
    RSTUDIO_PANDOC          /usr/lib/rstudio-server/bin/pandoc
    RSTUDIO_SESSION_STREAM
    						wzhou-d
    RSTUDIO_USER_IDENTITY   wzhou
    RSTUDIO_WINUTILS        bin/winutils
    SED                     /bin/sed
    SPARK_HOME              /opt/cloudera/parcels/SPARK2/lib/spark2
    SSH_ASKPASS             rpostback-askpass
    TAR                     /bin/gtar
    USER                    wzhou
    YARN_CONF_DIR           /etc/hadoop/conf.cloudera.yarn
    

    Ahhh, I noticed the environment missed SPARK_DIST_CLASSPATH environment variable. Then I set it using the command below just before sc <- spark_connect.

    Sys.setenv(SPARK_DIST_CLASSPATH = '/etc/hadoop/con:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/./:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*')
    

    Ok, try it again. Fantastic, it works!

    Cause
    Ok, here is the real cause of the issue. It’s unnecessary to specify java path for sparklyr as it does not require a java path. However, it does have dependency on spark-submit. When spark-submit is executed, it can read java path and then submit the jar files to Spark accordingly. The cause of the issue if SPARK_DIST_CLASSPATH is not set, spark-submit is not working and Spark executors can not be launched.

    Other Note
    The following are some of useful commands:

    ls()
    spark_installed_versions()
    sessionInfo()
    spark_home_dir() or spark_home
    path.expand(“~”)
    Sys.getenv(“SPARK_HOME”)
    spark_home_dir()
    character(0)
    config <- spark_config()
    spark_install_dir()
    sc
    backend
    monitor
    output_file
    spark_context
    java_context
    hive_context

    master
    method
    app_name
    config
    config$sparklyr.cores.local
    config$spark.sql.shuffle.partitions.local
    config$spark.env.SPARK_LOCAL_IP.local
    config$sparklyr.csv.embedded
    config$`sparklyr.shell.driver-class-path`

    Also there are a few useful articles about sparklyr and Rstudio:
    RStudio’s R Interface to Spark on Amazon EMR
    How to Install RStudio Server on CentOS 7
    Using R with Apache Spark
    sparklyr: a test drive on YARN
    Analyzing a billion NYC taxi trips in Spark

    Advertisements

    Create Hadoop Cluster on Google Cloud Platform

    There are many ways to create Hadoop clusters and I am going to show a few ways on Google Cloud Platform (GCP). The first approach is the standard way to build a Hadoop cluster, no matter whether you do it on cloud or on-premise. Basically create a group of VM instances and manually install Hadoop cluster on these VM instances. Many people have blogs or articles about this approach and I am not going to repeat the steps here.

    In this blog, I am going to discuss the approach using Google Cloud Dataproc and you can actually have a Hadoop cluster up and running
    within 2 minutes. Google Cloud Dataproc is a fully-managed cloud service for running Apache Hadoop cluster in a simple and fast way. The followings show the steps to create a Hadoop Cluster and submit a spark job to the cluster.

    Create a Hadoop Cluster
    Click Dataproc -> Clusters

    Then click Enable API

    Cloud Dataproc screen shows up. Click Create cluster

    Input the following parameters:
    Name : cluster-test1
    Region : Choose use-central1
    Zone : Choose us-central1-c

    1. Master Node
    Machine Type : The default is n1-standard-4, but I choose n1-standard-1 just for simple testing purpose.
    Cluster Mode : There are 3 modes here. Single Mode (1 master, 0 worker), Standard Mode (1 master, N worker), and High Mode (3 masters, N workers). Choose Standard Mode.
    Primary disk size : For my testing, 10GB 1s enough.

    2. Worker Nodes
    Similar configuration like Worker node. I use 3 worker nodes and disk size is 15 GB. You might notice that there is option to use local SSD storage. You can attach up to 8 local SSD devices to the VM instance. Each disk is 375 GB in size and you can not specify 10GB disk size here. The local SSDs are physically attached to the host server and offer higher performance and lower latency storage than Google’s persistent disk storage. The local SSDs is used for temporary data like shuffling data in MapReduce. The data on the local SSD storage is not persistent. For more information, please visit https://cloud.google.com/compute/docs/disks/local-ssd.

    Another thing to mention is that Dataproc uses Cloud Storage bucket instead of HDFS for the Hadoop cluster. Although the hadoop command is still working and you won’t feel anything different, the underline storage is different. In my opinion, it is actually better because Google Cloud Storage bucket definitely has much better reliability and scalability than HDFS.

    Click Create when everything is done. After a few minutes, the cluster is created.

    Click cluster-test1 and it should show the cluster information.

    If click VM Instances tab, we can see there is one master and 3 worker instances.

    Click Configuration tab. It shows all configuration information.

    Submit a Spark Job
    Click Cloud Dataproc -> Jobs.

    Once Submit Job screen shows up, input the following information, then click Submit.

    After the job completes, you should see the followings:

    To verify the result, I need to ssh to the master node to find out which ports are listening for connections. Click the drop down on the right of SSH of master node, then click Open in browser window.

    Then run the netstat command.

    cluster-test1-m:~$ netstat -a |grep LISTEN |grep tcp
    tcp        0      0 *:10033                 *:*                     LISTEN     
    tcp        0      0 *:10002                 *:*                     LISTEN     
    tcp        0      0 cluster-test1-m.c.:8020 *:*                     LISTEN     
    tcp        0      0 *:33044                 *:*                     LISTEN     
    tcp        0      0 *:ssh                   *:*                     LISTEN     
    tcp        0      0 *:52888                 *:*                     LISTEN     
    tcp        0      0 *:58266                 *:*                     LISTEN     
    tcp        0      0 *:35738                 *:*                     LISTEN     
    tcp        0      0 *:9083                  *:*                     LISTEN     
    tcp        0      0 *:34238                 *:*                     LISTEN     
    tcp        0      0 *:nfs                   *:*                     LISTEN     
    tcp        0      0 cluster-test1-m.c:10020 *:*                     LISTEN     
    tcp        0      0 localhost:mysql         *:*                     LISTEN     
    tcp        0      0 *:9868                  *:*                     LISTEN     
    tcp        0      0 *:9870                  *:*                     LISTEN     
    tcp        0      0 *:sunrpc                *:*                     LISTEN     
    tcp        0      0 *:webmin                *:*                     LISTEN     
    tcp        0      0 cluster-test1-m.c:19888 *:*                     LISTEN     
    tcp6       0      0 [::]:10001              [::]:*                  LISTEN     
    tcp6       0      0 [::]:44884              [::]:*                  LISTEN     
    tcp6       0      0 [::]:50965              [::]:*                  LISTEN     
    tcp6       0      0 [::]:ssh                [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m:omniorb [::]:*                  LISTEN     
    tcp6       0      0 [::]:46745              [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m.c.:8030 [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m.c.:8031 [::]:*                  LISTEN     
    tcp6       0      0 [::]:18080              [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m.c.:8032 [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m.c.:8033 [::]:*                  LISTEN     
    tcp6       0      0 [::]:nfs                [::]:*                  LISTEN     
    tcp6       0      0 [::]:33615              [::]:*                  LISTEN     
    tcp6       0      0 [::]:56911              [::]:*                  LISTEN     
    tcp6       0      0 [::]:sunrpc             [::]:*                  LISTEN  
    

    Check out directories.

    cluster-test1-m:~$ hdfs dfs -ls /
    17/09/12 12:12:24 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.1-hadoop2
    Found 2 items
    drwxrwxrwt   - mapred hadoop          0 2017-09-12 11:56 /tmp
    drwxrwxrwt   - hdfs   hadoop          0 2017-09-12 11:55 /user
    

    There are a few UI screens available to check out the Hadoop cluster and job status.
    HDFS NameNode (port 9870)

    YARN Resource Manager (port 8088)

    Spark Job History (port 18080)

    Dataproc approach is an easy deployment tool to create a Hadoop cluster. Although it is powerful, I miss the nice UI like Cloudera Manager. To install Cloudera CDH cluster, I need to use a different approach and I am going to discuss it in the future blog.

    Mysterious Time Gap in Spark Job Timeline

    Sometime ago one of my clients asked me a question when reviewing a Spark job: why there is a time gap in the event timeline, sometimes can be as long as one minute. If there are a few seconds, it seems make sense it could relate to Spark’s overhead between each job run. But for one minute, it seem to be too long for any overhead activities because the whole job takes only 8~9 minutes. I didn’t have a good answer for the question. Recently I did some benchmark for a spark job on a X3 full rack Oracle BDA in our lab, I did notice the same behavior. I tracked down the issue and finally figured out the cause of the timeline gap. I am going to share my findings in this blog.

    My benchmark is on an X3 full rack BDA with Spark version 1.6 and CDH 5.7. The spark testing script is a pretty simple one and important lines related to this timeline gap are listed as follows:

    line 42: val myDF = hiveContext.sql(“select * from wzdb.sales_part “)
    line 44: myDF.show()
    line 47: val rawDF = hiveContext.sql(“select * from wzdb.sales_raw limit 100 “)
    line 48: rawDF.show()

    Line 42 is pulling all data from wzdb.sales_part table, which is a hive partition table using Parquet and SNAPPY compression. The table has about 1.3 billion rows and 1,680 partitions. Line 44 just show the DataFrame myDF, by default it shows 20 rows. Similarly line 47 pull 100 rows from wzdb.sales_raw table and line 48 show the first 20 rows from the table. Ok, the code can not be simpler than that.

    After started the spark job, it finished in 40 seconds. However, when I checked out the Event Timeline, it shows there is a time gap between Job (or stage) Id 1 and Job Id 2. Job Id 1 started at 18:13:24 and completed at 18:13:26. But the Job Id 2 started at 18:13:35 and there was 9 seconds time gap, about 25% of total execution time. 25% seems a lot to me. Job Id 1 executed the line 42 while Job Id 2 executed the line 44. There is no execution code at line 43. Things become interesting.

    spark_event_timeline_1

    Then I checked out Executors page. It shows there are two Executors and each took about 6~7 seconds tasks time. Then I click the link to stdout Logs for each executor. I paid more attention to the timeline between 18:13:24 and 18:13:35.

    spark_executors_2

    Here are the part of output from Executor 1:

    16/12/02 18:13:14 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 27032@enkbda1node10.enkitec.com
    16/12/02 18:13:14 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
    16/12/02 18:13:16 INFO spark.SecurityManager: Changing view acls to: yarn,oracle
    16/12/02 18:13:17 INFO spark.SecurityManager: Changing modify acls to: yarn,oracle
    16/12/02 18:13:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, oracle); users with modify permissions: Set(yarn, oracle)
    16/12/02 18:13:18 INFO spark.SecurityManager: Changing view acls to: yarn,oracle
    16/12/02 18:13:18 INFO spark.SecurityManager: Changing modify acls to: yarn,oracle
    16/12/02 18:13:18 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, oracle); users with modify permissions: Set(yarn, oracle)
    16/12/02 18:13:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
    16/12/02 18:13:18 INFO Remoting: Starting remoting
    16/12/02 18:13:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutorActorSystem@enkbda1node10.enkitec.com:38184]
    16/12/02 18:13:19 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutorActorSystem@enkbda1node10.enkitec.com:38184]
    16/12/02 18:13:19 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 38184.
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-7af60bfd-9e27-45c8-bf30-a4bf126681f0
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u11/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-1c6099f9-37f2-4b4e-8b60-5c209bffc924
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u10/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-ec991001-9dc1-4017-ba55-314b54dd9109
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-c0df794d-5ee6-46fe-ad57-cf6186cd5ba7
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u08/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-673f1d0b-7e44-47e7-b36e-eed65c656c87
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u07/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-ab27950b-7dfd-48eb-a33c-7fbd02d29137
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u06/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-fe6697c4-d64a-47b3-9781-27d583370710
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u05/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-e0a928ab-5895-46b6-8b10-ad883e895632
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u04/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-4d39319c-b6be-4a17-8755-89477f81e899
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u03/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-cfd8fd9c-22cd-443f-8a1d-99b9867c8507
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u02/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-fff46796-d06a-45af-816b-c46d356be447
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u01/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-6cf05120-f651-4615-8abe-14631c5aadb1
    16/12/02 18:13:19 INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
    16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@192.168.12.105:33339
    16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
    16/12/02 18:13:19 INFO executor.Executor: Starting executor ID 2 on host enkbda1node10.enkitec.com
    16/12/02 18:13:19 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45880.
    16/12/02 18:13:19 INFO netty.NettyBlockTransferService: Server created on 45880
    16/12/02 18:13:19 INFO storage.BlockManager: external shuffle service port = 7337
    16/12/02 18:13:19 INFO storage.BlockManagerMaster: Trying to register BlockManager
    16/12/02 18:13:19 INFO storage.BlockManagerMaster: Registered BlockManager
    16/12/02 18:13:19 INFO storage.BlockManager: Registering executor with local external shuffle service.
    16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1
    16/12/02 18:13:19 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
    16/12/02 18:13:20 INFO executor.Executor: Fetching spark://192.168.12.105:33339/jars/scalawztest1_2.10-1.0.jar with timestamp 1480723975104
    16/12/02 18:13:20 INFO util.Utils: Fetching spark://192.168.12.105:33339/jars/scalawztest1_2.10-1.0.jar to /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a/fetchFileTemp8581640132534419761.tmp
    16/12/02 18:13:20 INFO util.Utils: Copying /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a/8566654191480723975104_cache to /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/container_e81_1480516695248_0056_01_000003/./scalawztest1_2.10-1.0.jar
    16/12/02 18:13:20 INFO executor.Executor: Adding file:/u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/container_e81_1480516695248_0056_01_000003/./scalawztest1_2.10-1.0.jar to class loader
    16/12/02 18:13:20 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0
    16/12/02 18:13:20 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.6 KB, free 25.6 KB)
    16/12/02 18:13:20 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 101 ms
    16/12/02 18:13:20 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 72.5 KB, free 98.1 KB)
    16/12/02 18:13:22 INFO client.FusionCommon: Initialized FusionHdfs with URI: hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=1, FileSystem: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_468103070_110, ugi=oracle (auth:SIMPLE)]], instance: 426700950
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=1
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=10
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=11
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=12
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=13
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=14
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=15
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=16
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=17
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=18
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=19
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=2
    ....
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9
    16/12/02 18:13:24 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 727069 bytes result sent to driver
    16/12/02 18:13:24 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 3
    16/12/02 18:13:24 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3)
    16/12/02 18:13:24 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
    16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 25.5 KB, free 123.6 KB)
    16/12/02 18:13:24 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 24 ms
    16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 72.4 KB, free 196.0 KB)
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-hadoop-bundle-1.5.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-format-2.1.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-pig-bundle-1.5.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/hive-exec-1.1.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/hive-jdbc-1.1.0-cdh5.7.0-standalone.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [shaded.parquet.org.slf4j.helpers.NOPLoggerFactory]
    16/12/02 18:13:26 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1503 bytes result sent to driver
    16/12/02 18:13:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4
    16/12/02 18:13:35 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 4)
    16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 3
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 93.1 KB, free 93.1 KB)
    16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 3 took 9 ms
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 684.5 KB, free 777.6 KB)
    16/12/02 18:13:35 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0 start: 0 end: 2028037 length: 2028037 hosts: []}
    16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 2
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 801.6 KB)
    16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 2 took 7 ms
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 334.5 KB, free 1136.1 KB)
    16/12/02 18:13:35 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
    16/12/02 18:13:36 INFO codegen.GenerateUnsafeProjection: Code generated in 148.139655 ms
    16/12/02 18:13:36 INFO codegen.GenerateUnsafeProjection: Code generated in 21.996524 ms
    16/12/02 18:13:36 INFO codegen.GenerateSafeProjection: Code generated in 15.975923 ms
    16/12/02 18:13:36 WARN parquet.CorruptStatistics: Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297
    16/12/02 18:13:36 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 4). 5365 bytes result sent to driver
    16/12/02 18:13:40 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown
    16/12/02 18:13:40 WARN executor.CoarseGrainedExecutorBackend: An unknown (enkbda1node05.enkitec.com:33339) driver disconnected.
    16/12/02 18:13:40 ERROR executor.CoarseGrainedExecutorBackend: Driver 192.168.12.105:33339 disassociated! Shutting down.
    16/12/02 18:13:40 INFO storage.DiskBlockManager: Shutdown hook called
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Shutdown hook called
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u04/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-47de642a-8665-4853-a4f7-a5ba3ece4295
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u03/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-f1aaa12c-c0d5-4c75-b1e6-e222ea9112b6
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u01/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-764ee07d-b082-4bc1-8b4d-3834d6cd14cd
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u02/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-f79cf620-4754-41e2-bb4c-6e242f8e16ad
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u11/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-ae63ef76-400a-4e8f-b68b-3a34ed25414e
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-37543a1b-c5ec-4f06-887d-9357bea573e4
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u05/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-9a5eb5c9-4009-49ff-aca6-64103429b245
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u07/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-6ac13e05-62ec-485f-8016-3bb4d1830492
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u10/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-15e0aa40-4a67-46c7-8eb0-fe8c8f2d0c6e
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u06/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-112c8eb2-e039-4f73-beae-e3c05a04c3c6
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u08/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-752c1bcc-ce1f-41a1-b779-347f77b04227
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a
    

    Executor 1 handled 840 Hive partitions. There are no additional logging information between 18:13:26 and 18:13:35. The log immediately jumped from 16/12/02 18:13:26 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1503 bytes result sent to driver to line 16/12/02 18:13:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4.

    Executor 2 has similar behavior and processed exactly 840 partitions as well. The interested logs are shown below:
    16/12/02 18:13:25 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 931 bytes result sent to driver
    16/12/02 18:13:38 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5

    It did not tell me anything useful. Ok, let me check out the Stages page. I am more interested in Stage Id 2 and Stage Id 3. Here is the screen for Stages summary.
    spark_stage_summary_3

    Let’s check out Stage Id 1 screen shown below.
    spark_stage_stage1_4

    The duration is only between 0.2 and 1 seconds for the two executors. Another interesting statistics is the Peak Execution Memory is 0 Byte for both. I don’t believe this stage can load 1.3 billion rows of data without any memory usage. In other words, I believe it does not do any IO related work at this stage although the stage is doing select * from wzdb.sales_part.

    Ok, let me check out the next stage, Stage 2. The DAG chart is so huge that it takes 20 seconds to shows up on the screen. There are literally 3,365 RDD partitions with union operation together to provide the result for show() function.

    spark_stage_stage2_5

    spark_stage_stage2_6

    The Metrics stats for this stage gives other interesting result.
    spark_stage_stage2_7

    The total duration is unbelievable fast, 1 second and input size of 1980KB and 21 records. Remember, by default, show() function just print out 20 rows. So this 1980KB and 21 records are definitely related to this 20 rows show() result. But with 3,365 RDD partitions are union together, 1 second seems unbelievable fast. Please note the block size is 256 MB in our environment. I just don’t believe it’s physically possible to perform stage 1 operation (select * from wzdb.sales_part with 1.3 billion rows Hive Piquet table) in 1 second and immediately show 20 rows of result in the following 1 second. Yes, Spark is in memory based processing and super fast. But from the DAG, it go through all 1.3 billion rows. It can’t be finished in 2 seconds, even with a full rack BDA. It must has something else not present in the picture.

    Luckily, for this test, I used the client mode as the deployment mode. So all of the log output was sent to my driver, the executing session. Then I found out where the missing time goes.

    16/12/02 18:13:23 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4178 ms on enkbda1node09.enkitec.com (1/2)
    16/12/02 18:13:24 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4458 ms on enkbda1node10.enkitec.com (2/2)
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: ResultStage 0 (sql at test2.scala:42) finished in 20.648 s
    16/12/02 18:13:24 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Job 0 finished: sql at test2.scala:42, took 21.026555 s
    16/12/02 18:13:24 INFO spark.SparkContext: Starting job: sql at test2.scala:42
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Got job 1 (sql at test2.scala:42) with 2 output partitions
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (sql at test2.scala:42)
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Parents of final stage: List()
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Missing parents: List()
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sql at test2.scala:42), which has no missing parents
    16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 72.4 KB, free 170.5 KB)
    16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 25.5 KB, free 196.0 KB)
    16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.12.105:14015 (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:24 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sql at test2.scala:42)
    16/12/02 18:13:24 INFO cluster.YarnScheduler: Adding task set 1.0 with 2 tasks
    16/12/02 18:13:24 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, enkbda1node09.enkitec.com, partition 0,PROCESS_LOCAL, 2044 bytes)
    16/12/02 18:13:24 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, enkbda1node10.enkitec.com, partition 1,PROCESS_LOCAL, 2143 bytes)
    16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on enkbda1node09.enkitec.com:19013 (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:25 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 294 ms on enkbda1node09.enkitec.com (1/2)
    16/12/02 18:13:26 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 1614 ms on enkbda1node10.enkitec.com (2/2)
    16/12/02 18:13:26 INFO scheduler.DAGScheduler: ResultStage 1 (sql at test2.scala:42) finished in 1.620 s
    16/12/02 18:13:26 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
    16/12/02 18:13:26 INFO scheduler.DAGScheduler: Job 1 finished: sql at test2.scala:42, took 1.665575 s
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.12.105:14015 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node09.enkitec.com:19013 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.12.105:14015 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node09.enkitec.com:19013 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO spark.ContextCleaner: Cleaned accumulator 1
    16/12/02 18:13:26 INFO spark.ContextCleaner: Cleaned accumulator 2
    16/12/02 18:13:26 INFO datasources.DataSourceStrategy: Selected 1680 partitions out of 1680, pruned 0.0% partitions.
    16/12/02 18:13:26 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 108.4 KB, free 108.4 KB)
    16/12/02 18:13:26 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 132.3 KB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.12.105:14015 (size: 24.0 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO spark.SparkContext: Created broadcast 2 from show at test2.scala:44
    16/12/02 18:13:27 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
    16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000128_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000128_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000284_0
    16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000137_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000137_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000293_0
    16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000138_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000138_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000294_0
    16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000139_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000139_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000295_0
    16/12/02 18:13:28 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000140_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000140_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000296_0
    
    ....
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000144_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000144_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000266_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000127_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000127_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000249_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000145_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000145_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000267_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000146_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000146_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000268_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000147_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000147_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000269_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000148_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000148_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000270_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000149_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000149_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000271_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000150_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000150_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000272_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000151_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000151_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000273_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000152_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000152_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000274_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000153_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000153_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000275_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000128_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000128_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000250_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000129_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000129_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000251_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000130_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000130_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000252_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000131_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000131_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000253_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000132_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000132_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000254_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000133_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000133_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000255_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000134_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000134_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000256_0
    16/12/02 18:13:35 INFO spark.SparkContext: Starting job: show at test2.scala:44
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Got job 2 (show at test2.scala:44) with 1 output partitions
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (show at test2.scala:44)
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Parents of final stage: List()
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Missing parents: List()
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3367] at show at test2.scala:44), which has no missing parents
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 684.5 KB, free 816.9 KB)
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 93.1 KB, free 909.9 KB)
    16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.12.105:14015 (size: 93.1 KB, free: 529.9 MB)
    16/12/02 18:13:35 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[3367] at show at test2.scala:44)
    16/12/02 18:13:35 INFO cluster.YarnScheduler: Adding task set 2.0 with 1 tasks
    16/12/02 18:13:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, enkbda1node10.enkitec.com, partition 0,RACK_LOCAL, 2384 bytes)
    16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 93.1 KB, free: 529.9 MB)
    16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 24.0 KB, free: 529.9 MB)
    16/12/02 18:13:36 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 1336 ms on enkbda1node10.enkitec.com (1/1)
    16/12/02 18:13:36 INFO scheduler.DAGScheduler: ResultStage 2 (show at test2.scala:44) finished in 1.336 s
    16/12/02 18:13:36 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 
    16/12/02 18:13:36 INFO scheduler.DAGScheduler: Job 2 finished: show at test2.scala:44, took 1.604959 s
    16/12/02 18:13:36 INFO spark.ContextCleaner: Cleaned accumulator 3
    16/12/02 18:13:36 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.12.105:14015 in memory (size: 93.1 KB, free: 530.0 MB)
    16/12/02 18:13:36 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 93.1 KB, free: 530.0 MB)
    +-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
    |      tran_id|  tran_timestamp|store_id|           item_name|item_category|dept_id|       dept_name|tran_price|cashier_name|year|month|day|
    +-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
    |1479564513475|2012-01-01 15:47|       4|rxkexrwwrnohuenpm...|          891|     26|             Toy|    185.17|       Maria|2012|    1|  1|
    |1479564513608|2012-01-01 10:11|      27|                  zz|          790|     26|   Auto and Tire|     68.55|         Von|2012|    1|  1|
    |1479564513748|2012-01-01 16:53|      26|fqzlqxvmpktwjwwgg...|          279|     10|Home Improvement|    233.47|         Von|2012|    1|  1|
    |1479564513750|2012-01-01 21:10|      22|               ndmeu|          487|     35|           Photo|     92.42|       Ileen|2012|    1|  1|
    |1479564526973|2012-01-01 07:52|       6|sbzmvrnxrvbohorbp...|          632|     18|         Jewelry|    164.34|        Keri|2012|    1|  1|
    |1479564469852|2012-01-01 18:54|      27|ptcilplqfvednxmmh...|          416|      3|    Baby Toddler|    144.86|      Michel|2012|    1|  1|
    |1479564523772|2012-01-01 11:07|       2|gvjrsdgidzunbbmfi...|          269|     17|          Sports|    231.67|       Penny|2012|    1|  1|
    |1479564524666|2012-01-01 08:51|       6|rfskpcezchhbhzsbd...|          595|     19|            Food|    175.85|         Rus|2012|    1|  1|
    |1479564470133|2012-01-01 18:36|      17|wzswebdjowfjjbslh...|          679|     10|   Health Beauty|    350.13|        Keri|2012|    1|  1|
    |1479564537634|2012-01-01 07:52|      12|             bhxoevw|          281|     34|    Baby Toddler|    352.02|        Keri|2012|    1|  1|
    |1479564470197|2012-01-01 06:04|       5|plqxmnrcuqisfygkl...|          152|     19|             Toy|     53.67|     Dorothy|2012|    1|  1|
    |1479564470201|2012-01-01 08:23|      13|frcatrjwwrbomxmnj...|           74|     20|   Auto and Tire|    359.81|       Ileen|2012|    1|  1|
    |1479564470386|2012-01-01 10:16|      15|cevezkxpsrzszshen...|          814|     13|   Auto and Tire|     27.92|     Sherril|2012|    1|  1|
    |1479564470724|2012-01-01 01:44|      26|jjiqfklffyzxzkyiz...|          248|      5|   Auto and Tire|    219.66|       Lidia|2012|    1|  1|
    |1479564470799|2012-01-01 23:26|      18|     voakgmajahxfgbq|          769|     17|          Sports|    251.07|       Susan|2012|    1|  1|
    |1479564470941|2012-01-01 13:28|      14|axkytaxghyujudtaw...|          207|      5|   Auto and Tire|    168.34|  Christoper|2012|    1|  1|
    |1479564471016|2012-01-01 15:37|       3|sdcnxhosatucnwwqk...|          192|     23|         Jewelry|       2.5|      Michel|2012|    1|  1|
    |1479564471049|2012-01-01 23:27|       9|zoppybkpqpgitrwlo...|          120|     32|          Sports|    147.28|     Dorothy|2012|    1|  1|
    |1479564471063|2012-01-01 23:51|      24|zknmvfsrsdxdysmdw...|          169|      6|         Jewelry|    292.59|   Broderick|2012|    1|  1|
    |1479564471113|2012-01-01 19:42|      20|uaqmjikgtisidskzm...|          388|     36|            Food|      3.55|       Maria|2012|    1|  1|
    +-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
    only showing top 20 rows
    
    16/12/02 18:13:36 INFO datasources.DataSourceStrategy: Selected 1680 partitions out of 1680, pruned 0.0% partitions.
    16/12/02 18:13:36 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 282.1 KB, free 414.5 KB)
    

    The above log shows that IO operations like parquet.ParquetRelation: Reading Parquet file(s) are completely outside the timeline for Job(/stage) 1 and Job 2. This is where the missing time goes. It is actually pretty good to have only 9~10 seconds to go through the all 1.3 billion rows. Mystery is solved.

    With the above findings in mind, I feel if I just do partition pruning and limit the number of rows scanned in the line 42 query, the gap timeline should be reduced as less IO is needed to read the data. So I add the partition pruning in the query on line 42 to select * from wzdb.sales_part where year=2013 and month=11 and day=13. Rerun the test. The result was exactly what I expected.

    Here is the new timeline:
    spark_event_timeline_8

    As you can see, there is only 1 second gap between Job Id 1 and Job Id 2. Here are the execution log. Only one partition of data was read.

    16/12/03 11:11:10 INFO scheduler.DAGScheduler: Job 1 finished: sql at test2.scala:42, took 1.322394 s
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.12.105:46755 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node09.enkitec.com:37048 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node03.enkitec.com:19685 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO spark.ContextCleaner: Cleaned accumulator 2
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.12.105:46755 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node09.enkitec.com:37048 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node03.enkitec.com:19685 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO spark.ContextCleaner: Cleaned accumulator 1
    16/12/03 11:11:11 INFO datasources.DataSourceStrategy: Selected 1 partitions out of 1680, pruned 99.94047619047619% partitions.
    16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 108.4 KB, free 108.4 KB)
    16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 132.3 KB)
    16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.12.105:46755 (size: 24.0 KB, free: 530.0 MB)
    16/12/03 11:11:11 INFO spark.SparkContext: Created broadcast 2 from show at test2.scala:44
    16/12/03 11:11:11 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
    16/12/03 11:11:11 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000165_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000165_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000191_0
    16/12/03 11:11:11 INFO spark.SparkContext: Starting job: show at test2.scala:44
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Got job 2 (show at test2.scala:44) with 1 output partitions
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (show at test2.scala:44)
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Parents of final stage: List()
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Missing parents: List()
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[9] at show at test2.scala:44), which has no missing parents
    16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 143.0 KB, free 275.3 KB)
    16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 16.5 KB, free 291.8 KB)
    16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.12.105:46755 (size: 16.5 KB, free: 530.0 MB)
    16/12/03 11:11:11 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[9] at show at test2.scala:44)
    16/12/03 11:11:11 INFO cluster.YarnScheduler: Adding task set 2.0 with 1 tasks
    16/12/03 11:11:11 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, enkbda1node09.enkitec.com, partition 0,NODE_LOCAL, 2386 bytes)
    16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on enkbda1node09.enkitec.com:37048 (size: 16.5 KB, free: 530.0 MB)
    16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on enkbda1node09.enkitec.com:37048 (size: 24.0 KB, free: 530.0 MB)
    16/12/03 11:11:12 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 1523 ms on enkbda1node09.enkitec.com (1/1)
    

    Lesson learned from this test is that Spark Metrics is helpful to identify the bottleneck of the spark application, but may not tell you the complete story. Just like in this case, if we just focus on the 1 or 2 seconds operations, it seems nothing need to be tuned here. On the contrary, we should need to focus on reducing the IO to access 1+ billion rows table by adding filter of partition keys and limiting total number of rows scan.

    Hadoop, HDFS, MapReduce and Spark on Big Data

    For the past few years, more and more companies are interested in starting big data projects. There are many technologies related to big data in the market right now, like Hadoop, Hadoop Distributed File System (HDFS), Map Reduce, Spark, Hive, Pig and many more. Although Hadoop is the most popular in this space, Spark gains a lot of attentions since 2014. For people who are interested in implementing big data in the near future, one of most popular questions is : which one should we use, Hadoop or Spark? It is a great question, but this may not be the apple to apple comparison.
    question_look
    Hadoop refers to a full ecosystem that includes the core projects like, HDFS, MapReduce, Hive, Flume, HBase. It has been around for about 10 years and has proven to be
    the solution of choice for processing large datasets. Hadoop has three major layers:

    • HDFS for storage
    • YARN for resource management
    • MapReduce for computation engine.

    In other words, HDFS is Hadoop’s storage layer while MapReduce is Hadoop’s processing framework that provides task automatic parallelization and distribution. The following diagram shows the components for Hadoop.
    hadoop
    Spark is targeting the layer of computation engine and can solve similar problems as MapReduce does but with in-memory approach. So MapReduce and Spark are the apple to apple comparison. The followings are the major differences between the two:

    Response Speed
    One major issue with MapReduce is that MapReduce need to persist intermediate data back to disk after a map or reduce action. The MapReduce job can write and read intermediate data on desk too many times during the job execution depending on the total number of Mappers, Reducers, and Combiners in the job configuration. Spark improve the IO read/write performance issue by processing intermediate data in-memory for fast computation. But Spark does needs significant more meomry than MapReduce for caching.

    Simple Use
    Another significant improvement in Spark over MapReduce is that Spark can not only bring data into memory for computation, but also easy of use for development. Spark supports Python and Scala in addition to Java, has much more APIs than MapReduce.
    Here is the sample code of WordCount in Hadoop Map Reduce.

    public static class WordCountMapClass extends MapReduceBase
      implements Mapper<LongWritable, Text, Text, IntWritable> {
    
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
    
      public void map(LongWritable key, Text value,
           OutputCollector<Text, IntWritable> output,
           Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line);
        while (itr.hasMoreTokens()) {
           word.set(itr.nextToken());
           output.collect(word, one);
        }
      }
    }
    
    public static class WorkdCountReduce extends MapReduceBase
      implements Reducer<Text, IntWritable, Text, IntWritable> {
    
      public void reduce(Text key, Iterator<IntWritable> values,
           OutputCollector<Text, IntWritable> output,
           Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
           sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
      }
    }
    

    The following is the sample code to perform the same work in Spark. As I like writing code in Python, here is the python implementation.

    text_file = spark.textFile("hdfs://...")
    counts = text_file.flatMap(lambda line: line.split(" ")) \
                 .map(lambda word: (word, 1)) \
                 .reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile("hdfs://...")
    

    Hardware Cost
    Spark does require to invest more in memory, which has the impact on your hardware investment while MapReduce can use regular commondity hardware.

    Storage Model
    MapReduce requires the HDFS storage for job execution. Spark doesn’t provide a storage system and is executed on top of a storage system. Spark is decoupling between Compuation and Storage model. It can go for an alternative computation model, abstracting data files with RDDs while use different storage models, like Hadoop HDFS, Amazon S3, Cassandara or Flat Files. By the way, S3 is Amazon Web Services’ solution for handling large files in the cloud. If running Spark on Amazon, you want to your Amazon EC2 compute nodes on the same zone as your S3 files to improve speed and save cost.

    Related Technologies
    MapReduce is suitable to batch-oriented applications. If want a real-time option, you need to use other platform, like Impala or Storm. For graph processing, need to use Giraph. For machine learning, people used to use Apache Mahout, but right now people are more favor to use Spark for machine learning.

    Spark has one single core library, supporting not only data computation, but also data streaming, interactive queries, graph processing, and machine learning. The following is the diagram of Spark components:
    spark_engine
    Maturity
    Hadoop MapReduce is a mature platform and is the primary framework on Hadoop to perform distributed processing, and has a large number of deployment in productions for many enterprises. It does has the limitations of high latency and batch oriented.
    With a rich set of APIs and in-memory processing, Spark can support the workload in both batch and real time and gradually gains more ground on distributed processing.

    Distributors
    There are several Apache Hadoop distributors in the big data space, like Cloudera, Hortonworks, and MapR. Cloudera CDH distribution is one of the common used ones. It not only includes Hadoop, Hive, Spark, and YARN, but also has its own Impala for ad-hoc query and Cloudera Manager for Hadoop configuration and management.

    So what is the usually start point for company just getting into big data space? There are two major types for big data projects:
    1) Pure Big Data Project. The data comes from various source, RDBMS, logging data, streaming data, and many event triggered data. The data can be structured or unstructured.
    After the data is populated in Hadoop HDFS storage, all the further processings happen on Hadoop econsystem, and no interaction between Hadoop and RDBMS during the processing stage.
    This type of project usually requires the changes in work flow, significant new code in the process. This type of project is usually used in the scenarios that the project is working on something brand new and no work has done in the space. In other words, writing a lot of new code is not an issue as anyway there is no code before.

    2) Hybrid Big Data Project. Some data, like historical data or certain large tables or table partitions, are offloaded from RDBMS to HDFS. The further processing involves
    the work between RDBMS and Hadoop ecosystem together. This type of project usually requires to keep majority of the current workflow, and modify certain existing code, especially keep the changes in SQL to a miniminal level. The Hybrid Big Data project requires significant more knowledge in both Big Data and RDBMS as the tuning of the system happens not only on Hadoop Ecosystem, but also on RDBMS side.

    In this blog, I will focus on the first type of project, Pure Big Data project and will discuss Hybrid Big Data Project on a separate post in the future.

    Of course, there are many approaches working on the Pure Big Data Projects. I just list one approach I usually recommend here.
    1) Build the Hadoop cluster using one of the major distributors, Cloudera CDH, or Hortonworks, or MapR.
    2) Define the process or workflow that can regularly put files on HDFS. It doesn’t matter where the source of data comes from. If the data comes from RDBMS, you can use sqoop, or just flat CSV file outputed from RDBMS. If the data is streaming data, you can use flume or other ways to inject the data into HDFS.
    3) Understand the data structure on HDFS, define your Hive table DDLs, and then point the locations of Hive tables to the file locations on HDFS. Yes, Hive might be slow for certain processing. But it is usually a good start to verify your big data ecosystem is performing normally.
    If you’re using Cloudera CDH, you can use Impala to perform ad-hoc queries and it has a much better performance than Hive. For use resource allocation and scheduling, you can use YARN.
    4) If the business requirements for the big data projects are Machine Learning, iterative data mining, and real time stream processing, the Spark is usually the choice. If dataset during the processing stage of batch job is very large and can not fit into memory, use the traditional Hadoop Map Reduce to process the data.
    5) Result analysis.

    Conclusion
    Finally as a general rule, I would recommend to use Hadoop HDFS for the data storage framework and HDFS is a great solution for permanent storage. Uses Spark for fast in-memory processing and built-in Machine Learning capability. Use MapReduce if data can not fit into memory during batch processing. Use Hive and Impala for quick SQL access, and use Pig to save time in writing MapReduce jobs. From learning perspective for beginners, it seems more make sense to me to start with Hadoop HDFS, MapReduce, Impala, Hive and then Spark.

    Of course, it also depends on what kind of problems need to solve and available budget and completion timeline, if the problem can be solved offline, response time is not critical, and budget is very limited, Hive and Impala can do the work. If you want fast response time, you need to consider Spark. But the hardware could be expensive for Spark’s memory requirements. In the end, you want to identify what problem the big data is trying to solve and keep flexible to use the right tool for solution.