Running Spark on Kubernetes

If 2017 is the year of Docker, 2018 is the year for Kubernetes. Kubernetes allows easy container management. It does not manage containers directly, but pods. A pod has one or more tightly coupled containers as a deployed object. Kubernetes also supports horizontal autoscaling for the pods. When the application is accessed by a large number of users, you can instruct Kubernetes to replicate your pods to balance the load. As expected, Spark can be deployed on Kubernetes. Currently there are a few ways to run Spark on Kubernetes.

1. Standalone Spark Cluster
Spark Standalone Mode is a nice way to quickly start a Spark cluster without using YARN or Mesos. In this way, you don’t have to use HDFS to store huge datasets. Instead you can use cloud storage to store whatever you like and decouple Spark Cluster with its storage. For a spark cluster, you will have one pod for Spark Master and multiple pods for Spark workers. In the case when you want to run the job, just deploy Spark Master and create a Master service. Then you could deploy multiple Spark workers. Once the job completes, delete all the pods from Kubernetes Workload.

Actually this is the recommended way to run jobs against big dataset on cloud. You don’t need 200 nodes Spark cluster running all the time, just run whenever you need to run the job. This is going to save significantly on the cloud cost. The Standalone Spark Cluster is not my topic in this blog and I may cover it in a different blog.

2. Spark on Kubernetes
Spark on Kubernetes is another interesting mode to run Spark cluster. It uses native Kubernetes scheduler for the resource management of Spark cluster. Here is the architecture of Spark on Kubernetes.

There is a blog, Apache Spark 2.3 with Native Kubernetes Support, which go through the steps to start a basic example Pi. However, I followed the steps and it did not work. Many steps and stuffs are missing. After some research, I figured out the correct steps to run it on Google Cloud Platform (GCP). This blog discusses the steps to show how to run the Pi example on Kubernetes.

Download Apache Spark 2.3
One of the major changes in this release is the inclusion of new Kubernetes Scheduler backend.The software can be downloaded at http://spark.apache.org/releases/spark-release-2-3-0.html or http://spark.apache.org/downloads.html. After downloading the software, unzip the file in the local machine.

Build Docker Image
The Spark on Kubernetes requires to specify an image for its driver and executors. I can get a Spark image from somewhere. But I like to build the image by myself. So I can easily customize it in the future. There is a docker file under spark-2.3.0-bin-hadoop2.7/kubernetes/dockerfiles/spark directory.

[root@docker1 spark]# cat Dockerfile 
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM openjdk:8-alpine

ARG spark_jars=jars
ARG img_path=kubernetes/dockerfiles

# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

RUN set -ex && \
    apk upgrade --no-cache && \
    apk add --no-cache bash tini libc6-compat && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/work-dir \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd

COPY ${spark_jars} /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY conf /opt/spark/conf
COPY ${img_path}/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY data /opt/spark/data

ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir

ENTRYPOINT [ "/opt/entrypoint.sh" ]

Pay more attention of line COPY examples /opt/spark/examples. The associated jar file for Pi example is in the examples directory. You need to remember to use this path /opt/spark/examples instead of the path on your local machine that run the job submission. I run into an issue of SparkPi class not found. It was caused by the fact I included the local path to the jar file on my local computer instead of the path on the docker image.

I has a Docker VM and use it for all Docker related operations. Logon the docker VM and run the followings to download/unzip the software:

[root@docker1 ]# mkdir spark-2.3
[root@docker1 ]# cd spark-2.3
[root@docker1 spark-2.3]# wget http://www-eu.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
--2018-04-24 19:11:09--  http://www-eu.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
Resolving www-eu.apache.org (www-eu.apache.org)... 195.154.151.36, 2001:bc8:2142:300::
Connecting to www-eu.apache.org (www-eu.apache.org)|195.154.151.36|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 226128401 (216M) [application/x-gzip]
Saving to: ‘spark-2.3.0-bin-hadoop2.7.tgz’

100%[===========================================================================================================================================>] 226,128,401 26.8MB/s   in 8.8s   

2018-04-24 19:11:18 (24.6 MB/s) - ‘spark-2.3.0-bin-hadoop2.7.tgz’ saved [226128401/226128401]

[root@docker1 spark-2.3]# ls -l
total 220856
-rw-r--r--. 1 root root     22860 Apr 24 19:10 spark-2.3.0-bin-hadoop2.7.tgz
-rw-r--r--. 1 root root 226128401 Feb 22 19:54 spark-2.3.0-bin-hadoop2.7.tgz.1
[root@docker1 spark-2.3]# tar -xzf spark-2.3.0-bin-hadoop2.7.tgz

Build the image and push to my google private container registry.

[root@docker1 spark-2.3.0-bin-hadoop2.7]# bin/docker-image-tool.sh -r gcr.io/wz-gcptest-357812 -t k8s-spark-2.3 build
Sending build context to Docker daemon  256.4MB
Step 1/14 : FROM openjdk:8-alpine
8-alpine: Pulling from library/openjdk
ff3a5c916c92: Pull complete 
5de5f69f42d7: Pull complete 
fd869c8b9b59: Pull complete 
Digest: 
. . . .
Step 13/14 : WORKDIR /opt/spark/work-dir
Removing intermediate container ed4b6fe3efd6
 ---> 69cd2dd1cae8
Step 14/14 : ENTRYPOINT [ "/opt/entrypoint.sh" ]
 ---> Running in 07da54b9fd34
Removing intermediate container 07da54b9fd34
 ---> 9c3bd46e026d
Successfully built 9c3bd46e026d
Successfully tagged gcr.io/wz-gcptest-357812/spark:k8s-spark-2.3

[root@docker1 spark-2.3.0-bin-hadoop2.7]# bin/docker-image-tool.sh -r gcr.io/wz-gcptest-357812 -t k8s-spark-2.3 push
The push refers to repository [gcr.io/wz-gcptest-357812/spark]
e7930b27b5e2: Pushed 
6f0480c071be: Pushed 
d7e218db3d89: Pushed 
8281f673b660: Pushed 
92e162ecfbe3: Pushed 
938ba54601ba: Pushed 
dc1345b437d9: Pushed 
4e3f1d639db8: Pushed 
685fdd7e6770: Layer already exists 
c9b26f41504c: Layer already exists 
cd7100a72410: Layer already exists 
k8s-spark-2.3: digest: sha256:2f865bf17985317909c866d036ba7988e1dbfc5fe10440a95f366264ceee0518 size: 2624

[root@docker1 ~]# docker image ls
REPOSITORY                                       TAG                 IMAGE ID            CREATED             SIZE
gcr.io/wz-gcptest-357812/spark                 k8s-spark-2.3       9c3bd46e026d        3 days ago          346MB
ubuntu                                           16.04               c9d990395902        2 weeks ago         113MB
hello-world                                      latest              e38bc07ac18e        2 weeks ago         1.85kB
openjdk                                          8-alpine            224765a6bdbe        3 months ago        102MB

Check Google Container Registry. It shows the image with the correct tag k8s-spark-2.3.

Configure RBAC
I have already had a Kubernetes cluster up and running with 3 nodes. I have to setup Role-Based Access Control (RBAC) to allow Spark on Kubernetes working. Otherwise it will throw the error as follows during job execution:

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://kubernetes.default.svc/api/v1/namespaces/default/pods/spark-pi-449efacd5a4a386ca31177faddb8eab4-driver. Message: Forbidden!Configured service account doesn’t have access. Service account may have been revoked. pods “spark-pi-449efacd5a4a386ca31177faddb8eab4-driver” is forbidden: User “system:serviceaccount:default:default” cannot get pods in the namespace “default”: Unknown user “system:serviceaccount:default:default”.

Check service account and clusterrolebinding.

weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl get serviceaccount
NAME      SECRETS   AGE
default   1         5m
weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl get clusterrolebinding
NAME                                           AGE
cluster-admin                                  5m
event-exporter-rb                              5m
gce:beta:kubelet-certificate-bootstrap         5m
gce:beta:kubelet-certificate-rotation          5m
heapster-binding                               5m
kube-apiserver-kubelet-api-admin               5m
kubelet-cluster-admin                          5m
npd-binding                                    5m
system:basic-user                              5m
system:controller:attachdetach-controller      5m
. . . .
system:controller:statefulset-controller       5m
system:controller:ttl-controller               5m
system:discovery                               5m
system:kube-controller-manager                 5m
system:kube-dns                                5m
system:kube-dns-autoscaler                     5m
system:kube-scheduler                          5m
system:node                                    5m
system:node-proxier                            5m

Create the spark service account and cluster role binding.

weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl create serviceaccount spark
serviceaccount "spark" created
weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
clusterrolebinding "spark-role" created

weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl get serviceaccount
NAME      SECRETS   AGE
default   1         1h
spark     1         56m

Run Spark Application
You might need to set SPARK_LOCAL_IP. Also need to find out MASTER_IP by running kubectl cluster-info | grep master |awk ‘{print $6}’. Use the following commands to set environment.

export PROJECT_ID="wz-gcptest-357812"
export ZONE="us-east1-b"
export KUBE_CLUSTER_NAME="wz-kube1"

gcloud config set project ${PROJECT_ID}
gcloud config set compute/zone ${ZONE}
gcloud container clusters get-credentials ${KUBE_CLUSTER_NAME}

Finally I can run the job. I intentionally gave a parameter of 1000000 to make the job running for a long time.

bin/spark-submit \
    --master k8s://https://104.136.128.109 \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=2 \
    --conf spark.app.name=spark-pi \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark  \
    --conf spark.kubernetes.container.image=gcr.io/wz-gcptest-357812/spark:k8s-spark-2.3 \
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar 1000000

If checking out GCP’s Kubernetes Workload screen, you will see one Spark driver and two executors running.

Monitor the Spark Job
If the job can run for a longer time, you will see the screen below when checking out Pod details. It shows CPU, Memory and Disk usage. It is usually good enough for monitoring purpose.

But how do I check out Spark UI screen? There are no resource manager like YARN in the picture. At this moment I need to use port forwarding to access Spark UI. Find out the driver pod and then setup the port forwarding.

weidong.zhou:@macpro ~ > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          7m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1   1/1       Running   0          7m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          7m
weidong.zhou:@macpro ~ > kubectl port-forward spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver 4040:4040
Forwarding from 127.0.0.1:4040 -> 4040

Find out the IP for the pod.

weidong.zhou:@macpro mytest_gcp > kubectl get pod -o wide
NAME                                               READY     STATUS    RESTARTS   AGE       IP          NODE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          10m       10.44.0.8   gke-wz-kube1-default-pool-2aac262a-thw0
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1   1/1       Running   0          10m       10.44.2.8   gke-wz-kube1-default-pool-2aac262a-09vt
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          10m       10.44.1.6   gke-wz-kube1-default-pool-2aac262a-23gk

Now we can see the familiar Spark UI.

If want to check out the logs from the driver pod, just run the followings:

weidong.zhou:@macpro mytest_gcp > kubectl -n=default logs -f spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Starting task 380242.0 in stage 0.0 (TID 380242, 10.44.1.6, executor 2, partition 380242, PROCESS_LOCAL, 7865 bytes)
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Finished task 380240.0 in stage 0.0 (TID 380240) in 3 ms on 10.44.1.6 (executor 2) (380241/1000000)
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Starting task 380243.0 in stage 0.0 (TID 380243, 10.44.2.8, executor 1, partition 380243, PROCESS_LOCAL, 7865 bytes)
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Finished task 380241.0 in stage 0.0 (TID 380241) in 5 ms on 10.44.2.8 (executor 1) (380242/1000000)
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Starting task 380244.0 in stage 0.0 (TID 380244, 10.44.1.6, executor 2, partition 380244, PROCESS_LOCAL, 7865 bytes)

Killing Executor and Driver
What’s happened if I killed one of executors?

weidong.zhou:@macpro mytest_gcp > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          23m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1   1/1       Running   0          23m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          23m
weidong.zhou:@macpro mytest_gcp > kubectl delete pod spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1
pod "spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1" deleted
 
weidong.zhou:@macpro mytest_gcp > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          25m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          25m

After 30 seconds, check again. A new executor starts.

weidong.zhou:@macpro mytest_gcp > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          26m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          25m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-3   1/1       Running   0          19s

The Spark UI show the executor changes.


This is actually what I expected. Ok, what’s happened if I killed the driver?

weidong.zhou:@macpro mytest_gcp > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          31m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          31m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-3   1/1       Running   0          5m
weidong.zhou:@macpro mytest_gcp > kubectl delete pod spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver
pod "spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver" deleted
weidong.zhou:@macpro mytest_gcp > kubectl get pods
No resources found, use --show-all to see completed objects.

So killing driver pod is actually the way to stop the Spark Application during the execution.

The nice thing about Spark on Kubernets is that all pods disappear whether the Spark job completes by it self or is killed. This allows the free of resource automatically. Overall, Spark on Kubernetes is an easy to quickly run Spark application on Kubernetes.

Advertisements

Sparking Water Shell: Cloud size under 12 Exception

In my last blog, I compared Sparking Water and H2O. Before I made Sparking-shell work, I run into a lot of issues. One of annoying errors was runtime exception: Cloud size under xx. Searched internet and found many people have the similar problems. There are many recommendations, ranging from downloading the latest and matching version, to set to certain parameters during startup. Unfortunately none of them were working for me. But finally I figured out the issue and would like to share my solution in this blog.

After I downloaded Sparking Water, unzipped the file, and run sparking-shell command as shown from http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/index.html. It looked good initially.

[sparkling-water-2.2.2]$ bin/sparkling-shell --conf "spark.executor.memory=1g"

-----
  Spark master (MASTER)     : local[*]
  Spark home   (SPARK_HOME) : /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.132.110.145:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1343).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

But when I run command val h2oContext = H2OContext.getOrCreate(spark), it gave me many errors as follows:

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Increasing 'spark.locality.wait' to value 30000
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Due to non-deterministic behavior of Spark broadcast-based joins
We recommend to disable them by configuring `spark.sql.autoBroadcastJoinThreshold` variable to value `-1`:
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
17/11/05 10:07:48 WARN internal.InternalH2OBackend: The property 'spark.scheduler.minRegisteredResourcesRatio' is not specified!
We recommend to pass `--conf spark.scheduler.minRegisteredResourcesRatio=1`
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Unsupported options spark.dynamicAllocation.enabled detected!
17/11/05 10:07:48 WARN internal.InternalH2OBackend:
The application is going down, since the parameter (spark.ext.h2o.fail.on.unsupported.spark.param,true) is true!
If you would like to skip the fail call, please, specify the value of the parameter to false.

java.lang.IllegalArgumentException: Unsupported argument: (spark.dynamicAllocation.enabled,true)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$$anonfun$checkUnsupportedSparkOptions$1.apply(InternalBackendUtils.scala:46)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$$anonfun$checkUnsupportedSparkOptions$1.apply(InternalBackendUtils.scala:38)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$class.checkUnsupportedSparkOptions(InternalBackendUtils.scala:38)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.checkUnsupportedSparkOptions(InternalH2OBackend.scala:30)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.checkAndUpdateConf(InternalH2OBackend.scala:60)
  at org.apache.spark.h2o.H2OContext.<init>(H2OContext.scala:90)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:355)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:383)
  ... 50 elided

You can see I need to pass in more parameters when starting sparking-shell. Change the parameters as follows:

bin/sparkling-shell \
--master yarn \
--conf spark.executor.memory=1g \
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
--conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.locality.wait=30000 \
--conf spark.scheduler.minRegisteredResourcesRatio=1

Ok, this time it looked better, at least warning messages disappeared. But got error message java.lang.RuntimeException: Cloud size under 2.

[sparkling-water-2.2.2]$ bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.memory=1g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) : /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.132.110.145:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1344).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
java.lang.RuntimeException: Cloud size under 2
  at water.H2O.waitForCloudSize(H2O.java:1689)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.init(InternalH2OBackend.scala:117)
  at org.apache.spark.h2o.H2OContext.init(H2OContext.scala:121)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:355)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:383)
  ... 50 elided

Not big deal. Anyway I didn’t specify the number of executors used and executor memory. It may complain about the size of the H2O cluster is too small. Add the following three parameters.

–conf spark.executor.instances=12 \
–conf spark.executor.memory=10g \
–conf spark.driver.memory=8g \

Rerun the whole thing got the same error with the size number changing to 12. It did not look right to me. Then I check out the H2O error logfile and found tons of messages as follows:

11-06 10:23:16.355 10.132.110.145:54325  30452  #09:54321 ERRR: Got IO error when sending batch UDP bytes: java.net.ConnectException: Connection refused
11-06 10:23:16.790 10.132.110.145:54325  30452  #06:54321 ERRR: Got IO error when sending batch UDP bytes: java.net.ConnectException: Connection refused

It looks like Sparking Water can not connect to Spark cluster. After some investigation, I then realized I installed and run sparking-shell from edge node on the BDA. If the H2O cluster was running inside a Spark application, the communication of Spark cluster on BDA is through BDA’s private network, or InfiniteBand network. Edge node can not directly communicate to IB network on BDA. With this assumption in mind, I installed and run Sparking Water on one of BDA nodes, it worked perfectly without any issue. Problem solved!

H2O vs Sparkling Water

People working in Hadoop environment are familiar with many products that make you feel like you’re in a zoo. For example, Pig, Hive, Beeswax, and ZooKeeper are some of them. As machine learning becomes more popular, products sounds like water came out, such as H2O and Sparking Water. I am not going to rule out the possibility we will see some big data products sound like wine. Anyway, people like call various new names to make their products sound cool, although many of them are similar.

In this blog, I am going to discuss H2O and Sparkling from high level.

H2O
H2O is a fast and open-source machine learning tool for big data analysis. It was launched in Silicon Valley in 2011 by a company called H2O.ai, formerly called Oxdata. The company is leaded by a few top data scientists in the world, and also backed by a few mathematical professors at Standford University on the company’s scientific advisory board.

H2O uses in-memory compression and can handles billions of rows in-memory. It allows companies to use all of their data without sampling to get predication faster. It includes built-in advanced algorithms such as deep learning, boosting, and bagging ensembles. It allows organizations to build powerful domain-specific
predictive engines for recommendations, customer churn, propensity to buy, dynamic pricing, and fraud detection for insurance and credit card companies.

H2O has an interface to R, Scala, Python, and Java. Not only it can be run on Hadoop and Cloud environment (AWS, Google Cloud, and Azure), but also can run on Linux, Mac, and Windows. The following shows the H2O architecture.

For your own testing, you could install H2O on your laptop. For big dataset or accessing Spark Clusters are installed somewhere, use Sparking Water, which I will discuss in the later part of the blog. The installation of H2O on your laptop is super easy. Here are the steps:
1. Download the H2O Zip File
Goto H2O download page at http://h2o.ai/download, choose Latest Stable Release under H2O block.

2. Run H2O
Just unzip the file and then run the jar file. It will start a web server on your local machine.

unzip h2o-3.14.0.7.zip
cd h2o-3.14.0.7
java -jar h2o.jar

3. Use H2O Flow UI
Input the link http://localhost:54321/flow/index.html in your browser and H2O Flow UI shows up as follows. You can do your data analysis work right now.

Sparking Water
Ok, let’s see what is Sparkling Water. In short, Sparking Water = H2O + Spark. Basically Sparking Water combines the fast machine learning algorithms of H2O with the popular in-memory platform – Spark to provide a fast and scalable solution for data analytics. Sparking Water supports Scala, R, or Python and can use H2O Flow UI to provide the machine learning platform for data scientists and application developers.

Sparking Water can run on the top of Spark in the following ways.

  • Local cluster
  • Standalone cluster
  • Spark cluster in a YARN environment

Sparking Water is designed as a Spark application. When it executes and you check from Spark UI, it is just a regular Spark Application. When it is launched, it first starts Spark Executors. Then H2O start services such as Key-Value store and memory manager inside executors. The following shows the relationship between Sparking Water, Spark and H2O.

To share data between Spark and H2O, Sparkling Water uses H2O’s H2OFrame. When converting an RDD/DataFrame to an H2O’s H2OFrame, it requires data duplication because it transfers data from RDD storage into H2OFrame. But data in H2OFrame is stored in compression format and does not need to be preserved in RDD.

A typical use case to use Sparking Water is to build Data Model. A model is constructed based on the estimation of metrics, testing data to give prediction that can be used in the rest of data pipeline.

The installation of Sparkling Water take a few more steps than H2O. As for now, Sparking Water is version 2.2.2. The detail installation steps are shown here at http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/index.html. However, I don’t like the installation instruction for the following reasons:

  • It uses local Spark cluster as example. As far as I know, local Spark is not a typical way to use Spark. The environment variable setting to point to local Spark cluster is confusing. The easiest step should verify that spark-shell is working or not. If it works, skip the step to install spark cluster or set SPARK related environment variables.
  • It gives a simple instruction to run sparking-shell –conf “spark.executor.memory=1g”. It misses a lot of other parameters. Otherwise, it will give you tons of warning messages.

Ok, here are the steps I believe are the correct steps:
1. Verify Your Spark Environment
Of course, someone should have Spark cluster up and running before even considering Sparking Water installation. Installing a new Spark cluster without knowing whether the cluster is working or not will be a nightmare to trace Sparking Water and H2O water issue if the issue comes from Spark Cluster. Also make sure to install Spark2. Sparking Water and H2O has a very strict rule in terms of the version compatibility among Spark, Sparking Water, and H2O. Here is a brief overview of the matrix.

For detail information please check out https://github.com/h2oai/rsparkling/blob/master/README.md

One easy way to verify spark cluster is good or not is to make sure you can access spark2-shell and run some simple example program without issues.

2. Download Sparking Water zip file
wget http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/sparkling-water-2.2.2.zip

3. Unzip the file and run

unzip sparkling-water-2.2.2.zip
cd sparkling-water-2.2.2

bin/sparkling-shell \
--master yarn \
--conf spark.executor.instances=12 \
--conf spark.executor.memory=10g \
--conf spark.driver.memory=8g \
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
--conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.locality.wait=30000 \
--conf spark.scheduler.minRegisteredResourcesRatio=1

The above sample code allows you to create H2O cluster based on 12 executors with 10GB memory for each executor. You also have to add bunch of parameters like spark.sql.autoBroadcastJoinThreshold, spark.dynamicAllocation.enabled and many more. Setting spark.dynamicAllocation.enabled to false makes sense as you probably want to have H2O cluster to use resource in a predicable way and not suck in all cluster memory when processing large amount of data. Adding these parameters can help you to avoid many annoying warning message when sparking-shell starts.

If in kerberosed environment, make sure to run kinit before executing sparking-shell.

4. Create an H2O cluster inside Spark Cluster
Run the following code.

import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark) 
import h2oContext._ 

The followings are the sample output from the run.

[install]$ cd sparkling-water-2.2.2
[sparkling-water-2.2.2]$ kinit wzhou
Password for wzhou@enkitec.com:
[sparkling-water-2.2.2]$ bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.instances=12 \
> --conf spark.executor.memory=10g \
> --conf spark.driver.memory=8g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) :
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/anaconda2/lib/python2.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/S                   taticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/St                   aticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/11/05 07:46:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes w                   here applicable
17/11/05 07:46:05 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cann                   ot be loaded.
17/11/05 07:46:06 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under                    SPARK_HOME.
Spark context Web UI available at http://192.168.10.45:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1362).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
h2oContext: org.apache.spark.h2o.H2OContext =

Sparkling Water Context:
 * H2O name: sparkling-water-wzhou_application_3608740912046_1362
 * cluster size: 12
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (8,enkbda1node07.enkitec.com,54321)
  (10,enkbda1node04.enkitec.com,54321)
  (6,enkbda1node08.enkitec.com,54321)
  (1,enkbda1node18.enkitec.com,54323)
  (9,enkbda1node18.enkitec.com,54321)
  (7,enkbda1node10.enkitec.com,54321)
  (4,enkbda1node17.enkitec.com,54321)
  (12,enkbda1node17.enkitec.com,54323)
  (3,enkbda1node16.enkitec.com,54323)
  (11,enkbda1node16.enkitec.com,54321)
  (2,enkbda1node04.enkitec.com,54323)
  (5,enkbda1node05.enkitec.com,54323)
  ------------------------

  Open H2O Flow in browser: http://192.168.10.45:54325 (CMD + ...
scala> import h2oContext._
import h2oContext._

You can see H2O cluster is indeed inside a Spark application.

5. Access H2O cluster
You can access the H2O cluster in many ways. One is using the H2O Flow UI. Exact the same UI I show you before, like http://192.168.10.45:54325.

Another way is to access it inside R Studio by using h2o.init(your_h2o_host, port). Use the above one as the example, here are the init command and other useful commands to check H2O cluster.

> library(sparklyr)
> library(h2o)
> library(dplyr)
> options(rsparkling.sparklingwater.version = "2.2.2")
> library(rsparkling)
> h2o.init(ip="enkbda1node05.enkitec.com", port=54325)
 Connection successful!

R is connected to the H2O cluster (in client mode): 
    H2O cluster uptime:         1 hours 7 minutes 
    H2O cluster version:        3.14.0.7 
    H2O cluster version age:    16 days  
    H2O cluster name:           sparkling-water-wzhou_application_3608740912046_1362 
    H2O cluster total nodes:    12 
    H2O cluster total memory:   93.30 GB 
    H2O cluster total cores:    384 
    H2O cluster allowed cores:  384 
    H2O cluster healthy:        TRUE 
    H2O Connection ip:          enkbda1node05.enkitec.com 
    H2O Connection port:        54325 
    H2O Connection proxy:       NA 
    H2O Internal Security:      FALSE 
    H2O API Extensions:         Algos, AutoML, Core V3, Core V4 
    R Version:                  R version 3.3.2 (2016-10-31) 

> h2o.clusterIsUp()
[1] TRUE


> h2o.clusterStatus()
Version: 3.14.0.7 
Cluster name: sparkling-water-wzhou_application_3608740912046_1362
Cluster size: 12 
Cluster is locked

                                             h2o healthy
1  enkbda1node04.enkitec.com/192.168.10.44:54321    TRUE
2  enkbda1node04.enkitec.com/192.168.10.44:54323    TRUE
3  enkbda1node05.enkitec.com/192.168.10.45:54323    TRUE
4  enkbda1node07.enkitec.com/192.168.10.47:54321    TRUE
5  enkbda1node08.enkitec.com/192.168.10.48:54321    TRUE
6  enkbda1node10.enkitec.com/192.168.10.50:54321    TRUE
7  enkbda1node16.enkitec.com/192.168.10.56:54321    TRUE
8  enkbda1node16.enkitec.com/192.168.10.56:54323    TRUE
9  enkbda1node17.enkitec.com/192.168.10.57:54321    TRUE
10 enkbda1node17.enkitec.com/192.168.10.57:54323    TRUE
11 enkbda1node18.enkitec.com/192.168.10.58:54321    TRUE
12 enkbda1node18.enkitec.com/192.168.10.58:54323    TRUE
      last_ping num_cpus sys_load mem_value_size   free_mem
1  1.509977e+12       32     1.21              0 8296753152
2  1.509977e+12       32     1.21              0 8615325696
3  1.509977e+12       32     0.63              0 8611375104
4  1.509977e+12       32     1.41              0 8290978816
5  1.509977e+12       32     0.33              0 8372216832
6  1.509977e+12       32     0.09              0 8164569088
7  1.509977e+12       32     0.23              0 8182391808
8  1.509977e+12       32     0.23              0 8579495936
9  1.509977e+12       32     0.15              0 8365195264
10 1.509977e+12       32     0.15              0 8131736576
11 1.509977e+12       32     0.63              0 8291118080
12 1.509977e+12       32     0.63              0 8243695616
     pojo_mem swap_mem    free_disk    max_disk   pid num_keys
1  1247908864        0 395950686208 4.91886e+11  4590        0
2   929336320        0 395950686208 4.91886e+11  4591        0
3   933286912        0 420010262528 4.91886e+11  3930        0
4  1253683200        0  4.25251e+11 4.91886e+11 16370        0
5  1172445184        0 425796304896 4.91886e+11 11998        0
6  1380092928        0 426025943040 4.91886e+11 20374        0
7  1362270208        0  4.21041e+11 4.91886e+11  4987        0
8   965166080        0  4.21041e+11 4.91886e+11  4988        0
9  1179466752        0 422286721024 4.91886e+11  6951        0
10 1412925440        0 422286721024 4.91886e+11  6952        0
11 1253543936        0 425969319936 4.91886e+11 24232        0
12 1300966400        0 425969319936 4.91886e+11 24233        0
   tcps_active open_fds rpcs_active
1            0      453           0
2            0      452           0
3            0      453           0
4            0      453           0
5            0      452           0
6            0      453           0
7            0      452           0
8            0      452           0
9            0      452           0
10           0      453           0
11           0      453           0
12           0      452           0

> h2o.networkTest()
Network Test: Launched from enkbda1node05.enkitec.com/192.168.10.45:54325
                                            destination
1                         all - collective bcast/reduce
2  remote enkbda1node04.enkitec.com/192.168.10.40:54321
3  remote enkbda1node04.enkitec.com/192.168.10.40:54323
4  remote enkbda1node05.enkitec.com/192.168.10.45:54323
5  remote enkbda1node07.enkitec.com/192.168.10.43:54321
6  remote enkbda1node08.enkitec.com/192.168.10.44:54321
7  remote enkbda1node10.enkitec.com/192.168.10.46:54321
8  remote enkbda1node16.enkitec.com/192.168.10.52:54321
9  remote enkbda1node16.enkitec.com/192.168.10.52:54323
10 remote enkbda1node17.enkitec.com/192.168.10.53:54321
11 remote enkbda1node17.enkitec.com/192.168.10.53:54323
12 remote enkbda1node18.enkitec.com/192.168.10.54:54321
13 remote enkbda1node18.enkitec.com/192.168.10.54:54323
                   1_bytes               1024_bytes
1   38.212 msec,  628  B/S     6.790 msec, 3.5 MB/S
2    4.929 msec,  405  B/S       752 usec, 2.6 MB/S
3    7.089 msec,  282  B/S       710 usec, 2.7 MB/S
4    5.687 msec,  351  B/S       634 usec, 3.1 MB/S
5    6.623 msec,  301  B/S       784 usec, 2.5 MB/S
6    6.277 msec,  318  B/S   2.680 msec, 746.0 KB/S
7    6.469 msec,  309  B/S       840 usec, 2.3 MB/S
8    6.595 msec,  303  B/S       801 usec, 2.4 MB/S
9    5.155 msec,  387  B/S       793 usec, 2.5 MB/S
10   5.204 msec,  384  B/S       703 usec, 2.8 MB/S
11   5.511 msec,  362  B/S       782 usec, 2.5 MB/S
12   6.784 msec,  294  B/S       927 usec, 2.1 MB/S
13   6.001 msec,  333  B/S       711 usec, 2.7 MB/S
               1048576_bytes
1   23.800 msec, 1008.4 MB/S
2     6.997 msec, 285.8 MB/S
3     5.576 msec, 358.6 MB/S
4     4.056 msec, 493.0 MB/S
5     5.066 msec, 394.8 MB/S
6     5.272 msec, 379.3 MB/S
7     5.176 msec, 386.3 MB/S
8     6.831 msec, 292.8 MB/S
9     5.772 msec, 346.4 MB/S
10    5.125 msec, 390.2 MB/S
11    5.274 msec, 379.2 MB/S
12    5.065 msec, 394.9 MB/S
13    4.960 msec, 403.2 MB/S

Resolve Sparklyr not Respond Issue on Port 8880

Recently I was approached by one of my clients to help them to investigate a weird Sparklyr issue. sparklyr is an interface between R and Spark introduced by RStudio about a years ago. The following is the the sparklyr architecture.

When trying to do sc <- spark_connect in RStudio, we got two errors as follows:

  • Failed while connecting to sparklyr to port (8880) for sessionid (3859): Gateway in port (8880) did not respond.
  • Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
  • Here is the detail message.

    > library(sparklyr)
    > library(dplyr) 
    > sc <- spark_connect(master = "yarn-client", config=spark_config(), version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
     
    Error in force(code) :
    Failed while connecting to sparklyr to port (8880) for sessionid (3859): Gateway in port (8880) did not respond.
    Path: /opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/lib/spark/bin/spark-submit
    Parameters: --class, sparklyr.Shell, --jars, '/usr/lib64/R/library/sparklyr/java/spark-csv_2.11-1.3.0.jar','/usr/lib64/R/library/sparklyr/java/commons-csv-1.1.jar','/usr/lib64/R/library/sparklyr/java/univocity-parsers-1.5.1.jar', '/usr/lib64/R/library/sparklyr/java/sparklyr-1.6-2.10.jar', 8880, 3859
    Log: /tmp/RtmzpSIMln/file9e23246605df7_spark.log
     
    ....
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
                   at org.apache.spark.deploy.SparkSubmitArguments.handle(SparkSubmitArguments.scala:394)
                   at org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:163)
                   at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:97)
                   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:114)
                   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream
                   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
                   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
                   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                   ... 5 more
    

    Did some research and found many people having the similar issue. Ok, try their recommendations one by one as follows.

  • Set SPARK_HOME environment
  • Try run Sys.setEnv(SPARK_HOME = “/opt/cloudera/parcels/CDH/lib/spark/”). No, not working.

  • Install latest version sparklyr
  • My client installed sparklyr less than one month ago. I don’t see why this option makes sense. Don’t even pursue this path.

  • Check Java Installation
  • The R on the same server uses the same version of Java without any issue. I don’t see why Java installation become a major concern here. Ignore this one.

  • No Hadoop Installation
  • Someone said just Spark installation is not enough, not to have Hadoop Installation as well. Clearly it does not fit our situation. The server is an edge node and has hadoop installation.

  • Do not have a valid kerberos ticket
  • Running system2(‘klist’) does show no kerberos ticket. Ok, I then open up a shell within RStudio Server by clicking tools -> shell, then issuing the kinit command.
    Rerun system2(‘klist’) shows I have a valid kerberos ticket. Try again. still not working.
    Note: even it is not working, this step is necessary for further action when the issue is fixed. So still need to run this one no matter what the result is.

  • Create a different configure and pass to spark_connect
  • Someone recommended to create a new configure and pass it in. It looks like a good idea. Unfortunately, just doesn’t work.

    wzconfig <- spark_config()
    wzconfig$`sparklyr.shell.deploy-mode` <- "client"
    wzconfig$spark.driver.cores <- 1
    wzconfig$spark.executor.cores <- 2
    wzconfig$spark.executor.memory <- "4G"
    sc <- spark_connect(master = "yarn-client", config=wzconfig, version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
    

    Actually this recommendation is missing another key parameter. By default the total number of executors launched is 2. I would usually bump up this number a little to get a better performance. You can use the following way to set up the
    total number of executors.

    wzconfig$spark.executor.instances <- 3
    

    Although this approach looks promising, still not working. But this approach is definitely a way to use for other purpose to better control the Spark resource usage.

  • Add remote address
  • Someone mentioned to set remote address. I thought this could another potential option as I resolved issues in Spark related to local IP issue in the past. So I add the following code in the configuration from the previous example, note parameter sparklyr.gateway.address is the hostname of active Resource Manager.

    wzconfig$sparklyr.gateway.remote <- TRUE
    wzconfig$sparklyr.gateway.address <- "cdhcluster01n03.mycompany.com" 
    

    Not working for this case.

  • Change deployment mode to yarn-cluster
  • This is probably the most unrealistic one. If connect as with master = “yarn-cluster”, the spark driver will be somewhere inside the Spark cluster. For our current case, I don’t believe this is the right solution. Don’t even try it.

  • Run Spark example
  • Someone recommended to run a spark-submit to verify SparkPi can be run from the environment. This looks reasonable. The good thing I figured out the issue before executing this one. But this definitely a valid and good test to verify spark-submit.

    /opt/cloudera/parcels/SPARK2/lib/spark2/bin/spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn /opt/cloudera/parcels/SPARK2/lib/spark2/examples/jars/spark-examples_2.11-2.1.0.jar 10
    
  • HA for yarn-cluster
  • There is an interesting post Add support for `yarn-cluster` with high availability #905 discussing about the issue might relate to multiple resource managers. We use HA and this post is an interesting one. But might not fit into our case because I feel we have not reached to the HA part yet with Class Not Found message.

  • Need to set JAVA_HOME
  • Verified it and we have it. So this is not the issue.

  • My Solution
  • After reviewing or trying out some of above solutions, I like to go back my way of thinking. I must say I am not an expert in R or RStudio with very limited knowledge about how it works. But I did have extensive background in Spark tuning and trouble shooting.

    I know the error message Gateway in port (8880) did not respond is always the first message shows up and looks like the cause of the issue. But I thought differently. I believe the 2nd error NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream looks more suspicious than the first one. Early this year I helped one of another clients on a weird Spark job issue, which is in the end, was caused by the incorrect path. It seems to me the path might not be right and cause Spark issue, then caused the first error of port not respond.

    With this idea in mind, I focused more the path verification. Run the command Sys.getenv() to get the environment as follows.

    > Sys.getenv()
    DISPLAY                 :0
    EDITOR                  vi
    GIT_ASKPASS             rpostback-askpass
    HADOOP_CONF_DIR         /etc/hadoop/conf.cloudera.hdfs
    HADOOP_HOME             /opt/cloudera/parcels/CDH
    HOME                    /home/wzhou
    JAVA_HOME               /usr/java/jdk1.8.0_144/jre
    LANG                    en_US.UTF-8
    LD_LIBRARY_PATH         /usr/lib64/R/lib::/lib:/usr/java/jdk1.8.0_92/jre/lib/amd64/server
    LN_S                    ln -s
    LOGNAME                 wzhou
    MAKE                    make
    PAGER                   /usr/bin/less
    PATH                    /usr/local/sbin:/usr/local/bin:/usr/bin:/usr/sbin:/sbin:/bin
    R_BROWSER               /usr/bin/xdg-open
    R_BZIPCMD               /usr/bin/bzip2
    R_DOC_DIR               /usr/share/doc/R-3.3.2
    R_GZIPCMD               /usr/bin/gzip
    R_HOME                  /usr/lib64/R
    R_INCLUDE_DIR           /usr/include/R
    R_LIBS_SITE             /usr/local/lib/R/site-library:/usr/local/lib/R/library:/usr/lib64/R/library:/usr/share/R/library
    R_LIBS_USER             ~/R/x86_64-redhat-linux-gnu-library/3.3
    R_PAPERSIZE             a4
    R_PDFVIEWER             /usr/bin/xdg-open
    R_PLATFORM              x86_64-redhat-linux-gnu
    R_PRINTCMD              lpr
    R_RD4PDF                times,hyper
    R_SESSION_TMPDIR        /tmp/RtmpZf9YMN
    R_SHARE_DIR             /usr/share/R
    R_SYSTEM_ABI            linux,gcc,gxx,gfortran,?
    R_TEXI2DVICMD           /usr/bin/texi2dvi
    R_UNZIPCMD              /usr/bin/unzip
    R_ZIPCMD                
    RMARKDOWN_MATHJAX_PATH
    						/usr/lib/rstudio-server/resources/mathjax-26
    RS_RPOSTBACK_PATH       /usr/lib/rstudio-server/bin/rpostback
    RSTUDIO                 1
    RSTUDIO_HTTP_REFERER    http://hadoop-edge06.mycompany.com:8787/
    RSTUDIO_PANDOC          /usr/lib/rstudio-server/bin/pandoc
    RSTUDIO_SESSION_STREAM
    						wzhou-d
    RSTUDIO_USER_IDENTITY   wzhou
    RSTUDIO_WINUTILS        bin/winutils
    SED                     /bin/sed
    SPARK_HOME              /opt/cloudera/parcels/SPARK2/lib/spark2
    SSH_ASKPASS             rpostback-askpass
    TAR                     /bin/gtar
    USER                    wzhou
    YARN_CONF_DIR           /etc/hadoop/conf.cloudera.yarn
    

    Ahhh, I noticed the environment missed SPARK_DIST_CLASSPATH environment variable. Then I set it using the command below just before sc <- spark_connect.

    Sys.setenv(SPARK_DIST_CLASSPATH = '/etc/hadoop/con:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/./:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*')
    

    Ok, try it again. Fantastic, it works!

    Cause
    Ok, here is the real cause of the issue. It’s unnecessary to specify java path for sparklyr as it does not require a java path. However, it does have dependency on spark-submit. When spark-submit is executed, it can read java path and then submit the jar files to Spark accordingly. The cause of the issue if SPARK_DIST_CLASSPATH is not set, spark-submit is not working and Spark executors can not be launched.

    Other Note
    The following are some of useful commands:

    ls()
    spark_installed_versions()
    sessionInfo()
    spark_home_dir() or spark_home
    path.expand(“~”)
    Sys.getenv(“SPARK_HOME”)
    spark_home_dir()
    character(0)
    config <- spark_config()
    spark_install_dir()
    sc
    backend
    monitor
    output_file
    spark_context
    java_context
    hive_context

    master
    method
    app_name
    config
    config$sparklyr.cores.local
    config$spark.sql.shuffle.partitions.local
    config$spark.env.SPARK_LOCAL_IP.local
    config$sparklyr.csv.embedded
    config$`sparklyr.shell.driver-class-path`

    Also there are a few useful articles about sparklyr and Rstudio:
    RStudio’s R Interface to Spark on Amazon EMR
    How to Install RStudio Server on CentOS 7
    Using R with Apache Spark
    sparklyr: a test drive on YARN
    Analyzing a billion NYC taxi trips in Spark

    Mysterious Time Gap in Spark Job Timeline

    Sometime ago one of my clients asked me a question when reviewing a Spark job: why there is a time gap in the event timeline, sometimes can be as long as one minute. If there are a few seconds, it seems make sense it could relate to Spark’s overhead between each job run. But for one minute, it seem to be too long for any overhead activities because the whole job takes only 8~9 minutes. I didn’t have a good answer for the question. Recently I did some benchmark for a spark job on a X3 full rack Oracle BDA in our lab, I did notice the same behavior. I tracked down the issue and finally figured out the cause of the timeline gap. I am going to share my findings in this blog.

    My benchmark is on an X3 full rack BDA with Spark version 1.6 and CDH 5.7. The spark testing script is a pretty simple one and important lines related to this timeline gap are listed as follows:

    line 42: val myDF = hiveContext.sql(“select * from wzdb.sales_part “)
    line 44: myDF.show()
    line 47: val rawDF = hiveContext.sql(“select * from wzdb.sales_raw limit 100 “)
    line 48: rawDF.show()

    Line 42 is pulling all data from wzdb.sales_part table, which is a hive partition table using Parquet and SNAPPY compression. The table has about 1.3 billion rows and 1,680 partitions. Line 44 just show the DataFrame myDF, by default it shows 20 rows. Similarly line 47 pull 100 rows from wzdb.sales_raw table and line 48 show the first 20 rows from the table. Ok, the code can not be simpler than that.

    After started the spark job, it finished in 40 seconds. However, when I checked out the Event Timeline, it shows there is a time gap between Job (or stage) Id 1 and Job Id 2. Job Id 1 started at 18:13:24 and completed at 18:13:26. But the Job Id 2 started at 18:13:35 and there was 9 seconds time gap, about 25% of total execution time. 25% seems a lot to me. Job Id 1 executed the line 42 while Job Id 2 executed the line 44. There is no execution code at line 43. Things become interesting.

    spark_event_timeline_1

    Then I checked out Executors page. It shows there are two Executors and each took about 6~7 seconds tasks time. Then I click the link to stdout Logs for each executor. I paid more attention to the timeline between 18:13:24 and 18:13:35.

    spark_executors_2

    Here are the part of output from Executor 1:

    16/12/02 18:13:14 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 27032@enkbda1node10.enkitec.com
    16/12/02 18:13:14 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
    16/12/02 18:13:16 INFO spark.SecurityManager: Changing view acls to: yarn,oracle
    16/12/02 18:13:17 INFO spark.SecurityManager: Changing modify acls to: yarn,oracle
    16/12/02 18:13:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, oracle); users with modify permissions: Set(yarn, oracle)
    16/12/02 18:13:18 INFO spark.SecurityManager: Changing view acls to: yarn,oracle
    16/12/02 18:13:18 INFO spark.SecurityManager: Changing modify acls to: yarn,oracle
    16/12/02 18:13:18 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, oracle); users with modify permissions: Set(yarn, oracle)
    16/12/02 18:13:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
    16/12/02 18:13:18 INFO Remoting: Starting remoting
    16/12/02 18:13:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutorActorSystem@enkbda1node10.enkitec.com:38184]
    16/12/02 18:13:19 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutorActorSystem@enkbda1node10.enkitec.com:38184]
    16/12/02 18:13:19 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 38184.
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-7af60bfd-9e27-45c8-bf30-a4bf126681f0
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u11/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-1c6099f9-37f2-4b4e-8b60-5c209bffc924
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u10/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-ec991001-9dc1-4017-ba55-314b54dd9109
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-c0df794d-5ee6-46fe-ad57-cf6186cd5ba7
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u08/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-673f1d0b-7e44-47e7-b36e-eed65c656c87
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u07/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-ab27950b-7dfd-48eb-a33c-7fbd02d29137
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u06/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-fe6697c4-d64a-47b3-9781-27d583370710
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u05/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-e0a928ab-5895-46b6-8b10-ad883e895632
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u04/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-4d39319c-b6be-4a17-8755-89477f81e899
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u03/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-cfd8fd9c-22cd-443f-8a1d-99b9867c8507
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u02/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-fff46796-d06a-45af-816b-c46d356be447
    16/12/02 18:13:19 INFO storage.DiskBlockManager: Created local directory at /u01/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/blockmgr-6cf05120-f651-4615-8abe-14631c5aadb1
    16/12/02 18:13:19 INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
    16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@192.168.12.105:33339
    16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
    16/12/02 18:13:19 INFO executor.Executor: Starting executor ID 2 on host enkbda1node10.enkitec.com
    16/12/02 18:13:19 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45880.
    16/12/02 18:13:19 INFO netty.NettyBlockTransferService: Server created on 45880
    16/12/02 18:13:19 INFO storage.BlockManager: external shuffle service port = 7337
    16/12/02 18:13:19 INFO storage.BlockManagerMaster: Trying to register BlockManager
    16/12/02 18:13:19 INFO storage.BlockManagerMaster: Registered BlockManager
    16/12/02 18:13:19 INFO storage.BlockManager: Registering executor with local external shuffle service.
    16/12/02 18:13:19 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1
    16/12/02 18:13:19 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
    16/12/02 18:13:20 INFO executor.Executor: Fetching spark://192.168.12.105:33339/jars/scalawztest1_2.10-1.0.jar with timestamp 1480723975104
    16/12/02 18:13:20 INFO util.Utils: Fetching spark://192.168.12.105:33339/jars/scalawztest1_2.10-1.0.jar to /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a/fetchFileTemp8581640132534419761.tmp
    16/12/02 18:13:20 INFO util.Utils: Copying /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a/8566654191480723975104_cache to /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/container_e81_1480516695248_0056_01_000003/./scalawztest1_2.10-1.0.jar
    16/12/02 18:13:20 INFO executor.Executor: Adding file:/u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/container_e81_1480516695248_0056_01_000003/./scalawztest1_2.10-1.0.jar to class loader
    16/12/02 18:13:20 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0
    16/12/02 18:13:20 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.6 KB, free 25.6 KB)
    16/12/02 18:13:20 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 101 ms
    16/12/02 18:13:20 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 72.5 KB, free 98.1 KB)
    16/12/02 18:13:22 INFO client.FusionCommon: Initialized FusionHdfs with URI: hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=1, FileSystem: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_468103070_110, ugi=oracle (auth:SIMPLE)]], instance: 426700950
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=1
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=10
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=11
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=12
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=13
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=14
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=15
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=16
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=17
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=18
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=19
    16/12/02 18:13:22 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2014/month=4/day=2
    ....
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8
    16/12/02 18:13:24 INFO sources.HadoopFsRelation: Listing hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9
    16/12/02 18:13:24 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 727069 bytes result sent to driver
    16/12/02 18:13:24 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 3
    16/12/02 18:13:24 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3)
    16/12/02 18:13:24 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
    16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 25.5 KB, free 123.6 KB)
    16/12/02 18:13:24 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 24 ms
    16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 72.4 KB, free 196.0 KB)
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-hadoop-bundle-1.5.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-format-2.1.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/parquet-pig-bundle-1.5.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/hive-exec-1.1.0-cdh5.7.0.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p1464.1349/jars/hive-jdbc-1.1.0-cdh5.7.0-standalone.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [shaded.parquet.org.slf4j.helpers.NOPLoggerFactory]
    16/12/02 18:13:26 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1503 bytes result sent to driver
    16/12/02 18:13:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4
    16/12/02 18:13:35 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 4)
    16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 3
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 93.1 KB, free 93.1 KB)
    16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 3 took 9 ms
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 684.5 KB, free 777.6 KB)
    16/12/02 18:13:35 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0 start: 0 end: 2028037 length: 2028037 hosts: []}
    16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 2
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 801.6 KB)
    16/12/02 18:13:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 2 took 7 ms
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 334.5 KB, free 1136.1 KB)
    16/12/02 18:13:35 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
    16/12/02 18:13:36 INFO codegen.GenerateUnsafeProjection: Code generated in 148.139655 ms
    16/12/02 18:13:36 INFO codegen.GenerateUnsafeProjection: Code generated in 21.996524 ms
    16/12/02 18:13:36 INFO codegen.GenerateSafeProjection: Code generated in 15.975923 ms
    16/12/02 18:13:36 WARN parquet.CorruptStatistics: Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297
    16/12/02 18:13:36 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 4). 5365 bytes result sent to driver
    16/12/02 18:13:40 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown
    16/12/02 18:13:40 WARN executor.CoarseGrainedExecutorBackend: An unknown (enkbda1node05.enkitec.com:33339) driver disconnected.
    16/12/02 18:13:40 ERROR executor.CoarseGrainedExecutorBackend: Driver 192.168.12.105:33339 disassociated! Shutting down.
    16/12/02 18:13:40 INFO storage.DiskBlockManager: Shutdown hook called
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Shutdown hook called
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u04/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-47de642a-8665-4853-a4f7-a5ba3ece4295
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u03/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-f1aaa12c-c0d5-4c75-b1e6-e222ea9112b6
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u01/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-764ee07d-b082-4bc1-8b4d-3834d6cd14cd
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u02/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-f79cf620-4754-41e2-bb4c-6e242f8e16ad
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u11/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-ae63ef76-400a-4e8f-b68b-3a34ed25414e
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u09/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-37543a1b-c5ec-4f06-887d-9357bea573e4
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u05/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-9a5eb5c9-4009-49ff-aca6-64103429b245
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u07/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-6ac13e05-62ec-485f-8016-3bb4d1830492
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u10/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-15e0aa40-4a67-46c7-8eb0-fe8c8f2d0c6e
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u06/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-112c8eb2-e039-4f73-beae-e3c05a04c3c6
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u08/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-752c1bcc-ce1f-41a1-b779-347f77b04227
    16/12/02 18:13:40 INFO util.ShutdownHookManager: Deleting directory /u12/hadoop/yarn/nm/usercache/oracle/appcache/application_1480516695248_0056/spark-4647d3e3-6655-4da1-b75b-94f3d872c71a
    

    Executor 1 handled 840 Hive partitions. There are no additional logging information between 18:13:26 and 18:13:35. The log immediately jumped from 16/12/02 18:13:26 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1503 bytes result sent to driver to line 16/12/02 18:13:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4.

    Executor 2 has similar behavior and processed exactly 840 partitions as well. The interested logs are shown below:
    16/12/02 18:13:25 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 931 bytes result sent to driver
    16/12/02 18:13:38 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5

    It did not tell me anything useful. Ok, let me check out the Stages page. I am more interested in Stage Id 2 and Stage Id 3. Here is the screen for Stages summary.
    spark_stage_summary_3

    Let’s check out Stage Id 1 screen shown below.
    spark_stage_stage1_4

    The duration is only between 0.2 and 1 seconds for the two executors. Another interesting statistics is the Peak Execution Memory is 0 Byte for both. I don’t believe this stage can load 1.3 billion rows of data without any memory usage. In other words, I believe it does not do any IO related work at this stage although the stage is doing select * from wzdb.sales_part.

    Ok, let me check out the next stage, Stage 2. The DAG chart is so huge that it takes 20 seconds to shows up on the screen. There are literally 3,365 RDD partitions with union operation together to provide the result for show() function.

    spark_stage_stage2_5

    spark_stage_stage2_6

    The Metrics stats for this stage gives other interesting result.
    spark_stage_stage2_7

    The total duration is unbelievable fast, 1 second and input size of 1980KB and 21 records. Remember, by default, show() function just print out 20 rows. So this 1980KB and 21 records are definitely related to this 20 rows show() result. But with 3,365 RDD partitions are union together, 1 second seems unbelievable fast. Please note the block size is 256 MB in our environment. I just don’t believe it’s physically possible to perform stage 1 operation (select * from wzdb.sales_part with 1.3 billion rows Hive Piquet table) in 1 second and immediately show 20 rows of result in the following 1 second. Yes, Spark is in memory based processing and super fast. But from the DAG, it go through all 1.3 billion rows. It can’t be finished in 2 seconds, even with a full rack BDA. It must has something else not present in the picture.

    Luckily, for this test, I used the client mode as the deployment mode. So all of the log output was sent to my driver, the executing session. Then I found out where the missing time goes.

    16/12/02 18:13:23 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4178 ms on enkbda1node09.enkitec.com (1/2)
    16/12/02 18:13:24 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4458 ms on enkbda1node10.enkitec.com (2/2)
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: ResultStage 0 (sql at test2.scala:42) finished in 20.648 s
    16/12/02 18:13:24 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Job 0 finished: sql at test2.scala:42, took 21.026555 s
    16/12/02 18:13:24 INFO spark.SparkContext: Starting job: sql at test2.scala:42
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Got job 1 (sql at test2.scala:42) with 2 output partitions
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (sql at test2.scala:42)
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Parents of final stage: List()
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Missing parents: List()
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sql at test2.scala:42), which has no missing parents
    16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 72.4 KB, free 170.5 KB)
    16/12/02 18:13:24 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 25.5 KB, free 196.0 KB)
    16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.12.105:14015 (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:24 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
    16/12/02 18:13:24 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sql at test2.scala:42)
    16/12/02 18:13:24 INFO cluster.YarnScheduler: Adding task set 1.0 with 2 tasks
    16/12/02 18:13:24 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, enkbda1node09.enkitec.com, partition 0,PROCESS_LOCAL, 2044 bytes)
    16/12/02 18:13:24 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, enkbda1node10.enkitec.com, partition 1,PROCESS_LOCAL, 2143 bytes)
    16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on enkbda1node09.enkitec.com:19013 (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:24 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:25 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 294 ms on enkbda1node09.enkitec.com (1/2)
    16/12/02 18:13:26 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 1614 ms on enkbda1node10.enkitec.com (2/2)
    16/12/02 18:13:26 INFO scheduler.DAGScheduler: ResultStage 1 (sql at test2.scala:42) finished in 1.620 s
    16/12/02 18:13:26 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
    16/12/02 18:13:26 INFO scheduler.DAGScheduler: Job 1 finished: sql at test2.scala:42, took 1.665575 s
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.12.105:14015 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node09.enkitec.com:19013 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.12.105:14015 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node09.enkitec.com:19013 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO spark.ContextCleaner: Cleaned accumulator 1
    16/12/02 18:13:26 INFO spark.ContextCleaner: Cleaned accumulator 2
    16/12/02 18:13:26 INFO datasources.DataSourceStrategy: Selected 1680 partitions out of 1680, pruned 0.0% partitions.
    16/12/02 18:13:26 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 108.4 KB, free 108.4 KB)
    16/12/02 18:13:26 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 132.3 KB)
    16/12/02 18:13:26 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.12.105:14015 (size: 24.0 KB, free: 530.0 MB)
    16/12/02 18:13:26 INFO spark.SparkContext: Created broadcast 2 from show at test2.scala:44
    16/12/02 18:13:27 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
    16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000022_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000128_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000128_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=1/000284_0
    16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000031_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000137_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000137_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=10/000293_0
    16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000032_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000138_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000138_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=11/000294_0
    16/12/02 18:13:27 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000033_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000139_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000139_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=12/000295_0
    16/12/02 18:13:28 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000034_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000140_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000140_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2012/month=1/day=13/000296_0
    
    ....
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000076_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000144_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000144_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=19/000266_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000059_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000127_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000127_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=2/000249_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000077_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000145_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000145_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=20/000267_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000000_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000146_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000146_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=21/000268_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000001_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000147_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000147_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=22/000269_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000002_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000148_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000148_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=23/000270_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000003_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000149_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000149_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=24/000271_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000004_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000150_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000150_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=25/000272_0
    16/12/02 18:13:34 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000005_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000151_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000151_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=26/000273_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000006_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000152_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000152_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=27/000274_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000007_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000153_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000153_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=28/000275_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000060_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000128_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000128_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=3/000250_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000061_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000129_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000129_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=4/000251_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000062_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000130_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000130_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=5/000252_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000063_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000131_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000131_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=6/000253_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000064_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000132_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000132_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=7/000254_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000065_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000133_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000133_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=8/000255_0
    16/12/02 18:13:35 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000066_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000134_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000134_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2016/month=9/day=9/000256_0
    16/12/02 18:13:35 INFO spark.SparkContext: Starting job: show at test2.scala:44
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Got job 2 (show at test2.scala:44) with 1 output partitions
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (show at test2.scala:44)
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Parents of final stage: List()
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Missing parents: List()
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3367] at show at test2.scala:44), which has no missing parents
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 684.5 KB, free 816.9 KB)
    16/12/02 18:13:35 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 93.1 KB, free 909.9 KB)
    16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.12.105:14015 (size: 93.1 KB, free: 529.9 MB)
    16/12/02 18:13:35 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
    16/12/02 18:13:35 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[3367] at show at test2.scala:44)
    16/12/02 18:13:35 INFO cluster.YarnScheduler: Adding task set 2.0 with 1 tasks
    16/12/02 18:13:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, enkbda1node10.enkitec.com, partition 0,RACK_LOCAL, 2384 bytes)
    16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 93.1 KB, free: 529.9 MB)
    16/12/02 18:13:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on enkbda1node10.enkitec.com:45880 (size: 24.0 KB, free: 529.9 MB)
    16/12/02 18:13:36 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 1336 ms on enkbda1node10.enkitec.com (1/1)
    16/12/02 18:13:36 INFO scheduler.DAGScheduler: ResultStage 2 (show at test2.scala:44) finished in 1.336 s
    16/12/02 18:13:36 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 
    16/12/02 18:13:36 INFO scheduler.DAGScheduler: Job 2 finished: show at test2.scala:44, took 1.604959 s
    16/12/02 18:13:36 INFO spark.ContextCleaner: Cleaned accumulator 3
    16/12/02 18:13:36 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.12.105:14015 in memory (size: 93.1 KB, free: 530.0 MB)
    16/12/02 18:13:36 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on enkbda1node10.enkitec.com:45880 in memory (size: 93.1 KB, free: 530.0 MB)
    +-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
    |      tran_id|  tran_timestamp|store_id|           item_name|item_category|dept_id|       dept_name|tran_price|cashier_name|year|month|day|
    +-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
    |1479564513475|2012-01-01 15:47|       4|rxkexrwwrnohuenpm...|          891|     26|             Toy|    185.17|       Maria|2012|    1|  1|
    |1479564513608|2012-01-01 10:11|      27|                  zz|          790|     26|   Auto and Tire|     68.55|         Von|2012|    1|  1|
    |1479564513748|2012-01-01 16:53|      26|fqzlqxvmpktwjwwgg...|          279|     10|Home Improvement|    233.47|         Von|2012|    1|  1|
    |1479564513750|2012-01-01 21:10|      22|               ndmeu|          487|     35|           Photo|     92.42|       Ileen|2012|    1|  1|
    |1479564526973|2012-01-01 07:52|       6|sbzmvrnxrvbohorbp...|          632|     18|         Jewelry|    164.34|        Keri|2012|    1|  1|
    |1479564469852|2012-01-01 18:54|      27|ptcilplqfvednxmmh...|          416|      3|    Baby Toddler|    144.86|      Michel|2012|    1|  1|
    |1479564523772|2012-01-01 11:07|       2|gvjrsdgidzunbbmfi...|          269|     17|          Sports|    231.67|       Penny|2012|    1|  1|
    |1479564524666|2012-01-01 08:51|       6|rfskpcezchhbhzsbd...|          595|     19|            Food|    175.85|         Rus|2012|    1|  1|
    |1479564470133|2012-01-01 18:36|      17|wzswebdjowfjjbslh...|          679|     10|   Health Beauty|    350.13|        Keri|2012|    1|  1|
    |1479564537634|2012-01-01 07:52|      12|             bhxoevw|          281|     34|    Baby Toddler|    352.02|        Keri|2012|    1|  1|
    |1479564470197|2012-01-01 06:04|       5|plqxmnrcuqisfygkl...|          152|     19|             Toy|     53.67|     Dorothy|2012|    1|  1|
    |1479564470201|2012-01-01 08:23|      13|frcatrjwwrbomxmnj...|           74|     20|   Auto and Tire|    359.81|       Ileen|2012|    1|  1|
    |1479564470386|2012-01-01 10:16|      15|cevezkxpsrzszshen...|          814|     13|   Auto and Tire|     27.92|     Sherril|2012|    1|  1|
    |1479564470724|2012-01-01 01:44|      26|jjiqfklffyzxzkyiz...|          248|      5|   Auto and Tire|    219.66|       Lidia|2012|    1|  1|
    |1479564470799|2012-01-01 23:26|      18|     voakgmajahxfgbq|          769|     17|          Sports|    251.07|       Susan|2012|    1|  1|
    |1479564470941|2012-01-01 13:28|      14|axkytaxghyujudtaw...|          207|      5|   Auto and Tire|    168.34|  Christoper|2012|    1|  1|
    |1479564471016|2012-01-01 15:37|       3|sdcnxhosatucnwwqk...|          192|     23|         Jewelry|       2.5|      Michel|2012|    1|  1|
    |1479564471049|2012-01-01 23:27|       9|zoppybkpqpgitrwlo...|          120|     32|          Sports|    147.28|     Dorothy|2012|    1|  1|
    |1479564471063|2012-01-01 23:51|      24|zknmvfsrsdxdysmdw...|          169|      6|         Jewelry|    292.59|   Broderick|2012|    1|  1|
    |1479564471113|2012-01-01 19:42|      20|uaqmjikgtisidskzm...|          388|     36|            Food|      3.55|       Maria|2012|    1|  1|
    +-------------+----------------+--------+--------------------+-------------+-------+----------------+----------+------------+----+-----+---+
    only showing top 20 rows
    
    16/12/02 18:13:36 INFO datasources.DataSourceStrategy: Selected 1680 partitions out of 1680, pruned 0.0% partitions.
    16/12/02 18:13:36 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 282.1 KB, free 414.5 KB)
    

    The above log shows that IO operations like parquet.ParquetRelation: Reading Parquet file(s) are completely outside the timeline for Job(/stage) 1 and Job 2. This is where the missing time goes. It is actually pretty good to have only 9~10 seconds to go through the all 1.3 billion rows. Mystery is solved.

    With the above findings in mind, I feel if I just do partition pruning and limit the number of rows scanned in the line 42 query, the gap timeline should be reduced as less IO is needed to read the data. So I add the partition pruning in the query on line 42 to select * from wzdb.sales_part where year=2013 and month=11 and day=13. Rerun the test. The result was exactly what I expected.

    Here is the new timeline:
    spark_event_timeline_8

    As you can see, there is only 1 second gap between Job Id 1 and Job Id 2. Here are the execution log. Only one partition of data was read.

    16/12/03 11:11:10 INFO scheduler.DAGScheduler: Job 1 finished: sql at test2.scala:42, took 1.322394 s
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.12.105:46755 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node09.enkitec.com:37048 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on enkbda1node03.enkitec.com:19685 in memory (size: 25.5 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO spark.ContextCleaner: Cleaned accumulator 2
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.12.105:46755 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node09.enkitec.com:37048 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on enkbda1node03.enkitec.com:19685 in memory (size: 25.6 KB, free: 530.0 MB)
    16/12/03 11:11:10 INFO spark.ContextCleaner: Cleaned accumulator 1
    16/12/03 11:11:11 INFO datasources.DataSourceStrategy: Selected 1 partitions out of 1680, pruned 99.94047619047619% partitions.
    16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 108.4 KB, free 108.4 KB)
    16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.0 KB, free 132.3 KB)
    16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.12.105:46755 (size: 24.0 KB, free: 530.0 MB)
    16/12/03 11:11:11 INFO spark.SparkContext: Created broadcast 2 from show at test2.scala:44
    16/12/03 11:11:11 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
    16/12/03 11:11:11 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_2, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000057_0_copy_3, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000165_0, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000165_0_copy_1, hdfs://enkbda-ns/user/hive/warehouse/wzdb.db/sales_part/year=2013/month=11/day=13/000191_0
    16/12/03 11:11:11 INFO spark.SparkContext: Starting job: show at test2.scala:44
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Got job 2 (show at test2.scala:44) with 1 output partitions
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (show at test2.scala:44)
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Parents of final stage: List()
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Missing parents: List()
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[9] at show at test2.scala:44), which has no missing parents
    16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 143.0 KB, free 275.3 KB)
    16/12/03 11:11:11 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 16.5 KB, free 291.8 KB)
    16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.12.105:46755 (size: 16.5 KB, free: 530.0 MB)
    16/12/03 11:11:11 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
    16/12/03 11:11:11 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[9] at show at test2.scala:44)
    16/12/03 11:11:11 INFO cluster.YarnScheduler: Adding task set 2.0 with 1 tasks
    16/12/03 11:11:11 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, enkbda1node09.enkitec.com, partition 0,NODE_LOCAL, 2386 bytes)
    16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on enkbda1node09.enkitec.com:37048 (size: 16.5 KB, free: 530.0 MB)
    16/12/03 11:11:11 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on enkbda1node09.enkitec.com:37048 (size: 24.0 KB, free: 530.0 MB)
    16/12/03 11:11:12 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 1523 ms on enkbda1node09.enkitec.com (1/1)
    

    Lesson learned from this test is that Spark Metrics is helpful to identify the bottleneck of the spark application, but may not tell you the complete story. Just like in this case, if we just focus on the 1 or 2 seconds operations, it seems nothing need to be tuned here. On the contrary, we should need to focus on reducing the IO to access 1+ billion rows table by adding filter of partition keys and limiting total number of rows scan.