Running Spark on Kubernetes

If 2017 is the year of Docker, 2018 is the year for Kubernetes. Kubernetes allows easy container management. It does not manage containers directly, but pods. A pod has one or more tightly coupled containers as a deployed object. Kubernetes also supports horizontal autoscaling for the pods. When the application is accessed by a large number of users, you can instruct Kubernetes to replicate your pods to balance the load. As expected, Spark can be deployed on Kubernetes. Currently there are a few ways to run Spark on Kubernetes.

1. Standalone Spark Cluster
Spark Standalone Mode is a nice way to quickly start a Spark cluster without using YARN or Mesos. In this way, you don’t have to use HDFS to store huge datasets. Instead you can use cloud storage to store whatever you like and decouple Spark Cluster with its storage. For a spark cluster, you will have one pod for Spark Master and multiple pods for Spark workers. In the case when you want to run the job, just deploy Spark Master and create a Master service. Then you could deploy multiple Spark workers. Once the job completes, delete all the pods from Kubernetes Workload.

Actually this is the recommended way to run jobs against big dataset on cloud. You don’t need 200 nodes Spark cluster running all the time, just run whenever you need to run the job. This is going to save significantly on the cloud cost. The Standalone Spark Cluster is not my topic in this blog and I may cover it in a different blog.

2. Spark on Kubernetes
Spark on Kubernetes is another interesting mode to run Spark cluster. It uses native Kubernetes scheduler for the resource management of Spark cluster. Here is the architecture of Spark on Kubernetes.

There is a blog, Apache Spark 2.3 with Native Kubernetes Support, which go through the steps to start a basic example Pi. However, I followed the steps and it did not work. Many steps and stuffs are missing. After some research, I figured out the correct steps to run it on Google Cloud Platform (GCP). This blog discusses the steps to show how to run the Pi example on Kubernetes.

Download Apache Spark 2.3
One of the major changes in this release is the inclusion of new Kubernetes Scheduler backend.The software can be downloaded at http://spark.apache.org/releases/spark-release-2-3-0.html or http://spark.apache.org/downloads.html. After downloading the software, unzip the file in the local machine.

Build Docker Image
The Spark on Kubernetes requires to specify an image for its driver and executors. I can get a Spark image from somewhere. But I like to build the image by myself. So I can easily customize it in the future. There is a docker file under spark-2.3.0-bin-hadoop2.7/kubernetes/dockerfiles/spark directory.

[root@docker1 spark]# cat Dockerfile 
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM openjdk:8-alpine

ARG spark_jars=jars
ARG img_path=kubernetes/dockerfiles

# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

RUN set -ex && \
    apk upgrade --no-cache && \
    apk add --no-cache bash tini libc6-compat && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/work-dir \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd

COPY ${spark_jars} /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY conf /opt/spark/conf
COPY ${img_path}/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY data /opt/spark/data

ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir

ENTRYPOINT [ "/opt/entrypoint.sh" ]

Pay more attention of line COPY examples /opt/spark/examples. The associated jar file for Pi example is in the examples directory. You need to remember to use this path /opt/spark/examples instead of the path on your local machine that run the job submission. I run into an issue of SparkPi class not found. It was caused by the fact I included the local path to the jar file on my local computer instead of the path on the docker image.

I has a Docker VM and use it for all Docker related operations. Logon the docker VM and run the followings to download/unzip the software:

[root@docker1 ]# mkdir spark-2.3
[root@docker1 ]# cd spark-2.3
[root@docker1 spark-2.3]# wget http://www-eu.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
--2018-04-24 19:11:09--  http://www-eu.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
Resolving www-eu.apache.org (www-eu.apache.org)... 195.154.151.36, 2001:bc8:2142:300::
Connecting to www-eu.apache.org (www-eu.apache.org)|195.154.151.36|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 226128401 (216M) [application/x-gzip]
Saving to: ‘spark-2.3.0-bin-hadoop2.7.tgz’

100%[===========================================================================================================================================>] 226,128,401 26.8MB/s   in 8.8s   

2018-04-24 19:11:18 (24.6 MB/s) - ‘spark-2.3.0-bin-hadoop2.7.tgz’ saved [226128401/226128401]

[root@docker1 spark-2.3]# ls -l
total 220856
-rw-r--r--. 1 root root     22860 Apr 24 19:10 spark-2.3.0-bin-hadoop2.7.tgz
-rw-r--r--. 1 root root 226128401 Feb 22 19:54 spark-2.3.0-bin-hadoop2.7.tgz.1
[root@docker1 spark-2.3]# tar -xzf spark-2.3.0-bin-hadoop2.7.tgz

Build the image and push to my google private container registry.

[root@docker1 spark-2.3.0-bin-hadoop2.7]# bin/docker-image-tool.sh -r gcr.io/wz-gcptest-357812 -t k8s-spark-2.3 build
Sending build context to Docker daemon  256.4MB
Step 1/14 : FROM openjdk:8-alpine
8-alpine: Pulling from library/openjdk
ff3a5c916c92: Pull complete 
5de5f69f42d7: Pull complete 
fd869c8b9b59: Pull complete 
Digest: 
. . . .
Step 13/14 : WORKDIR /opt/spark/work-dir
Removing intermediate container ed4b6fe3efd6
 ---> 69cd2dd1cae8
Step 14/14 : ENTRYPOINT [ "/opt/entrypoint.sh" ]
 ---> Running in 07da54b9fd34
Removing intermediate container 07da54b9fd34
 ---> 9c3bd46e026d
Successfully built 9c3bd46e026d
Successfully tagged gcr.io/wz-gcptest-357812/spark:k8s-spark-2.3

[root@docker1 spark-2.3.0-bin-hadoop2.7]# bin/docker-image-tool.sh -r gcr.io/wz-gcptest-357812 -t k8s-spark-2.3 push
The push refers to repository [gcr.io/wz-gcptest-357812/spark]
e7930b27b5e2: Pushed 
6f0480c071be: Pushed 
d7e218db3d89: Pushed 
8281f673b660: Pushed 
92e162ecfbe3: Pushed 
938ba54601ba: Pushed 
dc1345b437d9: Pushed 
4e3f1d639db8: Pushed 
685fdd7e6770: Layer already exists 
c9b26f41504c: Layer already exists 
cd7100a72410: Layer already exists 
k8s-spark-2.3: digest: sha256:2f865bf17985317909c866d036ba7988e1dbfc5fe10440a95f366264ceee0518 size: 2624

[root@docker1 ~]# docker image ls
REPOSITORY                                       TAG                 IMAGE ID            CREATED             SIZE
gcr.io/wz-gcptest-357812/spark                 k8s-spark-2.3       9c3bd46e026d        3 days ago          346MB
ubuntu                                           16.04               c9d990395902        2 weeks ago         113MB
hello-world                                      latest              e38bc07ac18e        2 weeks ago         1.85kB
openjdk                                          8-alpine            224765a6bdbe        3 months ago        102MB

Check Google Container Registry. It shows the image with the correct tag k8s-spark-2.3.

Configure RBAC
I have already had a Kubernetes cluster up and running with 3 nodes. I have to setup Role-Based Access Control (RBAC) to allow Spark on Kubernetes working. Otherwise it will throw the error as follows during job execution:

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://kubernetes.default.svc/api/v1/namespaces/default/pods/spark-pi-449efacd5a4a386ca31177faddb8eab4-driver. Message: Forbidden!Configured service account doesn’t have access. Service account may have been revoked. pods “spark-pi-449efacd5a4a386ca31177faddb8eab4-driver” is forbidden: User “system:serviceaccount:default:default” cannot get pods in the namespace “default”: Unknown user “system:serviceaccount:default:default”.

Check service account and clusterrolebinding.

weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl get serviceaccount
NAME      SECRETS   AGE
default   1         5m
weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl get clusterrolebinding
NAME                                           AGE
cluster-admin                                  5m
event-exporter-rb                              5m
gce:beta:kubelet-certificate-bootstrap         5m
gce:beta:kubelet-certificate-rotation          5m
heapster-binding                               5m
kube-apiserver-kubelet-api-admin               5m
kubelet-cluster-admin                          5m
npd-binding                                    5m
system:basic-user                              5m
system:controller:attachdetach-controller      5m
. . . .
system:controller:statefulset-controller       5m
system:controller:ttl-controller               5m
system:discovery                               5m
system:kube-controller-manager                 5m
system:kube-dns                                5m
system:kube-dns-autoscaler                     5m
system:kube-scheduler                          5m
system:node                                    5m
system:node-proxier                            5m

Create the spark service account and cluster role binding.

weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl create serviceaccount spark
serviceaccount "spark" created
weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
clusterrolebinding "spark-role" created

weidong.zhou:@macpro spark-2.3.0-bin-hadoop2.7 > kubectl get serviceaccount
NAME      SECRETS   AGE
default   1         1h
spark     1         56m

Run Spark Application
You might need to set SPARK_LOCAL_IP. Also need to find out MASTER_IP by running kubectl cluster-info | grep master |awk ‘{print $6}’. Use the following commands to set environment.

export PROJECT_ID="wz-gcptest-357812"
export ZONE="us-east1-b"
export KUBE_CLUSTER_NAME="wz-kube1"

gcloud config set project ${PROJECT_ID}
gcloud config set compute/zone ${ZONE}
gcloud container clusters get-credentials ${KUBE_CLUSTER_NAME}

Finally I can run the job. I intentionally gave a parameter of 1000000 to make the job running for a long time.

bin/spark-submit \
    --master k8s://https://104.136.128.109 \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=2 \
    --conf spark.app.name=spark-pi \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark  \
    --conf spark.kubernetes.container.image=gcr.io/wz-gcptest-357812/spark:k8s-spark-2.3 \
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar 1000000

If checking out GCP’s Kubernetes Workload screen, you will see one Spark driver and two executors running.

Monitor the Spark Job
If the job can run for a longer time, you will see the screen below when checking out Pod details. It shows CPU, Memory and Disk usage. It is usually good enough for monitoring purpose.

But how do I check out Spark UI screen? There are no resource manager like YARN in the picture. At this moment I need to use port forwarding to access Spark UI. Find out the driver pod and then setup the port forwarding.

weidong.zhou:@macpro ~ > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          7m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1   1/1       Running   0          7m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          7m
weidong.zhou:@macpro ~ > kubectl port-forward spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver 4040:4040
Forwarding from 127.0.0.1:4040 -> 4040

Find out the IP for the pod.

weidong.zhou:@macpro mytest_gcp > kubectl get pod -o wide
NAME                                               READY     STATUS    RESTARTS   AGE       IP          NODE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          10m       10.44.0.8   gke-wz-kube1-default-pool-2aac262a-thw0
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1   1/1       Running   0          10m       10.44.2.8   gke-wz-kube1-default-pool-2aac262a-09vt
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          10m       10.44.1.6   gke-wz-kube1-default-pool-2aac262a-23gk

Now we can see the familiar Spark UI.

If want to check out the logs from the driver pod, just run the followings:

weidong.zhou:@macpro mytest_gcp > kubectl -n=default logs -f spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Starting task 380242.0 in stage 0.0 (TID 380242, 10.44.1.6, executor 2, partition 380242, PROCESS_LOCAL, 7865 bytes)
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Finished task 380240.0 in stage 0.0 (TID 380240) in 3 ms on 10.44.1.6 (executor 2) (380241/1000000)
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Starting task 380243.0 in stage 0.0 (TID 380243, 10.44.2.8, executor 1, partition 380243, PROCESS_LOCAL, 7865 bytes)
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Finished task 380241.0 in stage 0.0 (TID 380241) in 5 ms on 10.44.2.8 (executor 1) (380242/1000000)
2018-04-27 20:40:02 INFO  TaskSetManager:54 - Starting task 380244.0 in stage 0.0 (TID 380244, 10.44.1.6, executor 2, partition 380244, PROCESS_LOCAL, 7865 bytes)

Killing Executor and Driver
What’s happened if I killed one of executors?

weidong.zhou:@macpro mytest_gcp > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          23m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1   1/1       Running   0          23m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          23m
weidong.zhou:@macpro mytest_gcp > kubectl delete pod spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1
pod "spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-1" deleted
 
weidong.zhou:@macpro mytest_gcp > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          25m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          25m

After 30 seconds, check again. A new executor starts.

weidong.zhou:@macpro mytest_gcp > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          26m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          25m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-3   1/1       Running   0          19s

The Spark UI show the executor changes.


This is actually what I expected. Ok, what’s happened if I killed the driver?

weidong.zhou:@macpro mytest_gcp > kubectl get pods
NAME                                               READY     STATUS    RESTARTS   AGE
spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver   1/1       Running   0          31m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-2   1/1       Running   0          31m
spark-pi-6e2c3b5d707531689031d3259f57b2ea-exec-3   1/1       Running   0          5m
weidong.zhou:@macpro mytest_gcp > kubectl delete pod spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver
pod "spark-pi-6e2c3b5d707531689031d3259f57b2ea-driver" deleted
weidong.zhou:@macpro mytest_gcp > kubectl get pods
No resources found, use --show-all to see completed objects.

So killing driver pod is actually the way to stop the Spark Application during the execution.

The nice thing about Spark on Kubernets is that all pods disappear whether the Spark job completes by it self or is killed. This allows the free of resource automatically. Overall, Spark on Kubernetes is an easy to quickly run Spark application on Kubernetes.

Parquet File Can not Be Read in Sparkling Water H2O

For the past few months, I wrote several blogs related to H2O topic:
Use Python for H2O
H2O vs Sparkling Water
Sparking Water Shell: Cloud size under 12 Exception
Access Sparkling Water via R Studio
Running H2O Cluster in Background and at Specific Port Number
Weird Ref-count mismatch Message from H2O

Sparkling Water and H2O are very good in terms of performance for data science projects. But if something works beautifully in one environment, not working in another environment could be the most annoyed thing in the world. This is exactly what had happened at one of my clients.

They used to use Sparkling Water and H2O in Oracle BDA environment and worked great. However, even with a full rack BDA (18 nodes), it is still not enough to run big dataset on H2O. Recently they moved to a much bigger CDH cluster (non-BDA environment) with CDH 5.13 installed. Sparkling Water is still working, however there was one major issue: parquet files can not be read correctly. There are no issue in reading the same parquet files from Spark shell and pyspark. This is really an annoying issue as parquet format is one of data formats that are heavily used by the client. After many failed tries and investigation, I was finally able to figure out the issue and implement a workaround solution. This blog discussed this parquet reading issue and workaround solution in Sparkling Water.

Create Test Data Set
I did the test in my CDH cluster (CDH 5.13). I first created a small test data set, stock.csv, and uploaded to /user/root directory on HDFS.

date,close,volume,open,high,low
9/23/16,24.05,56837,24.13,24.22,23.88
9/22/16,24.1,56675,23.49,24.18,23.49
9/21/16,23.38,70925,23.21,23.58,23.025
9/20/16,23.07,35429,23.17,23.264,22.98
9/19/16,23.12,34257,23.22,23.27,22.96
9/16/16,23.16,83309,22.96,23.21,22.96
9/15/16,23.01,43258,22.7,23.25,22.53
9/14/16,22.69,33891,22.81,22.88,22.66
9/13/16,22.81,59871,22.75,22.89,22.53
9/12/16,22.85,109145,22.9,22.95,22.74
9/9/16,23.03,115901,23.53,23.53,23.02
9/8/16,23.6,32717,23.8,23.83,23.55
9/7/16,23.85,143635,23.69,23.89,23.69
9/6/16,23.68,43577,23.78,23.79,23.43
9/2/16,23.84,31333,23.45,23.93,23.41
9/1/16,23.42,49547,23.45,23.48,23.26

Create a Parquet File
Run the following in spark2-shell to create a parquet file and make sure that I can read it back.

scala> val myTest=spark.read.format("csv").load("/user/root/stock.csv")
myTest: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 4 more fields]
 
scala> myTest.show(3)
+-------+-----+------+-----+-----+-----+
|    _c0|  _c1|   _c2|  _c3|  _c4|  _c5|
+-------+-----+------+-----+-----+-----+
|   date|close|volume| open| high|  low|
|9/23/16|24.05| 56837|24.13|24.22|23.88|
|9/22/16| 24.1| 56675|23.49|24.18|23.49|
+-------+-----+------+-----+-----+-----+
only showing top 3 rows
 
scala> myTest.write.format("parquet").save("/user/root/mytest.parquet")
 
scala> val readTest = spark.read.format("parquet").load("/user/root/mytest.parquet")
readTest: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 4 more fields]
 
scala> readTest.show(3)
+-------+-----+------+-----+-----+-----+
|    _c0|  _c1|   _c2|  _c3|  _c4|  _c5|
+-------+-----+------+-----+-----+-----+
|   date|close|volume| open| high|  low|
|9/23/16|24.05| 56837|24.13|24.22|23.88|
|9/22/16| 24.1| 56675|23.49|24.18|23.49|
+-------+-----+------+-----+-----+-----+
only showing top 3 rows

Start a Sparkling Water H2O Cluster
I started a Sparking Water Cluster with 2 nodes.

[root@a84-master--2df67700-f9d1-46f3-afcf-ba27a523e143 sparkling-water-2.2.7]# . /etc/spark2/conf.cloudera.spark2_on_yarn/spark-env.sh
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
[root@a84-master--2df67700-f9d1-46f3-afcf-ba27a523e143 sparkling-water-2.2.7]# bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.instances=2 \
> --conf spark.executor.memory=1g \
> --conf spark.driver.memory=1g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) : /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
  H2O build version         : 3.16.0.4 (wheeler)
  Spark build version       : 2.2.1
  Scala version             : 2.11
----

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.128.0.5:4040
Spark context available as 'sc' (master = yarn, app id = application_1518097883047_0001).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark) 
h2oContext: org.apache.spark.h2o.H2OContext =                                   

Sparkling Water Context:
 * H2O name: sparkling-water-root_application_1518097883047_0001
 * cluster size: 2
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (1,a84-worker--6e693a0a-d92c-4172-81b7-f2f07b6d5d7c.c.cdh-director-194318.internal,54321)
  (2,a84-worker--c0ecccae-cead-44f2-9f75-39aadb1d024a.c.cdh-director-194318.internal,54321)
  ------------------------

  Open H2O Flow in browser: http://10.128.0.5:54321 (CMD + click in Mac OSX)

scala> import h2oContext._
import h2oContext._
scala> 

Read Parquet File from H2O Flow UI
Open H2O Flow UI and read the same parquet file.

After click Parse these files, got corrupted file.

Obviously, parquet file was not read correctly. At this moment, there are no error messages in the H2O console. If continue to import the file, the H2O Flow UI throw the following error

The H2O console would show the following error:

scala> 18/02/08 09:23:59 WARN servlet.ServletHandler: Error for /3/Parse
java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/api/ReadSupport
	at water.parser.parquet.ParquetParser.correctTypeConversions(ParquetParser.java:104)
	at water.parser.parquet.ParquetParserProvider.createParserSetup(ParquetParserProvider.java:48)
	at water.parser.ParseSetup.getFinalSetup(ParseSetup.java:213)
	at water.parser.ParseDataset.forkParseDataset(ParseDataset.java:119)
	at water.parser.ParseDataset.parse(ParseDataset.java:43)
	at water.api.ParseHandler.parse(ParseHandler.java:36)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at water.api.Handler.handle(Handler.java:63)
	at water.api.RequestServer.serve(RequestServer.java:451)
	at water.api.RequestServer.doGeneric(RequestServer.java:296)
	at water.api.RequestServer.doPost(RequestServer.java:222)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
	at ai.h2o.org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
	at ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:503)
	at ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
	at ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:429)
	at ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
	at ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
	at ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:154)
	at ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
	at water.JettyHTTPD$LoginHandler.handle(JettyHTTPD.java:192)
	at ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:154)
	at ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
	at ai.h2o.org.eclipse.jetty.server.Server.handle(Server.java:370)
	at ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
	at ai.h2o.org.eclipse.jetty.server.BlockingHttpConnection.handleRequest(BlockingHttpConnection.java:53)
	at ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection.content(AbstractHttpConnection.java:982)
	at ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.content(AbstractHttpConnection.java:1043)
	at ai.h2o.org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:865)
	at ai.h2o.org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:240)
	at ai.h2o.org.eclipse.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
	at ai.h2o.org.eclipse.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
	at ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
	at ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.hadoop.api.ReadSupport
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 39 more

Solution
As BDA has no issue in the same Sparkling Water H2O deployment and BDA used CDH 5.10, I initially focused more on CDH version difference. I built three CDH clusters using three different CDH versions: 5.13, 5.12 and 5.10. All of them show the exact same error. This made me rule out the possibility from CDH version difference and shifted focus on the environment difference, especially class path and jar files. Tried setting JAVA_HOME, SPARK_HOME, SPARK_DIST_CLASSPATH and unfortunately none of them worked.

I noticed /etc/spark2/conf.cloudera.spark2_on_yarn/classpath.txt seem have much less entries than classpath.txt under spark 1.6. Tried adding back the missing entries. Still no luck.

Added two more parameters to get more information about H2O log.

--conf spark.ext.h2o.node.log.level=INFO \
--conf spark.ext.h2o.client.log.level=INFO \

It gave a little more useful information. It complained about class ParquetFileWriter not found.

$ cat h2o_10.54.225.9_54000-5-error.log
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR: DistributedException from /192.168.10.54:54005: 'org/apache/parquet/hadoop/ParquetFileWriter', caused by java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetFileWriter
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.MRTask.getResult(MRTask.java:478)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.MRTask.getResult(MRTask.java:486)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.MRTask.doAll(MRTask.java:402)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.parser.ParseSetup.guessSetup(ParseSetup.java:283)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.api.ParseSetupHandler.guessSetup(ParseSetupHandler.java:40)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

My client found a temporary solution by using h2odriver.jar following the instruction from Using H2O on Hadoop. The command used is shown below:

cd /opt/h2o-3
hadoop jar h2odriver.jar -nodes 70 -mapperXmx 40g -output hdfs://PROD01ns/user/mytestuser/log24

Although this solution provides similar functionalities in Sparkling Water, it has some critical performance issues:
1. The above command would create 70 nodes H2O cluster. If using Sparkling Water, it would be evenly distribute to all available nodes. But the above h2odriver.jarapproach would heavily use a few hadoop nodes. For big dataset, majority of activities happened only to 3~4 nodes, which made those nodes’ cpu utilization close to 100%. For one test big dataset, it has never completed the parsing file. It failed after 20 minutes run.
2. Unlike Sparkling Water, it actually read files during the parsing phase, not in the importing phase.
3. The performance is pretty bad compared with Sparkling Water. I guess Sparkling Water is using underlined Spark to distribute the load evenly.

Anyway this hadoop jar h2odriver.jar solution is not an ideal workaround for this issue.

Then I happened to read this blog: Incorrect results with corrupt binary statistics with the new Parquet reader. This article has nothing to do my issue, but it mentioned about parquet v1.8. I did remember seeing one note from one Sparkling Water developer discussing should integrate with parquet v1.8 in the future for certain parquet issue in H2O. Unfortunately I could not find the link to this discussion any more. But it inspired me to think that maybe the issue is that Sparkling Water depends certain parquet library and the current environment don’t have it. The standard CDH distribution and Spark2 seem using parquet v1.5. Oracle BDA has many more software installed and maybe it happened to have the correct library installed somewhere. It seems H2O related jar file may contain this library, what’s happened if I include the H2O jar somewhere in Sparkling Water.

With this idea in mind, I download H2O from http://h2o-release.s3.amazonaws.com/h2o/rel-wheeler/4/index.html. Unzip the file and h2o.jar file is the one I need. I then modified sparkling-shell and change the last line of code as follows by add h2o.jar file to jars parameter.

#spark-shell --jars "$FAT_JAR_FILE" --driver-memory "$DRIVER_MEMORY" --conf spark.driver.extraJavaOptions="$EXTRA_DRIVER_PROPS" "$@"
H2O_JAR_FILE="/home/mytestuser/install/h2o-3.16.0.4/h2o.jar"
spark-shell --jars "$FAT_JAR_FILE,$H2O_JAR_FILE" --driver-memory "$DRIVER_MEMORY" --conf spark.driver.extraJavaOptions="$EXTRA_DRIVER_PROPS" "$@"

Restart my H2O cluster. It worked!

Finally after many days work, Sparkling Water can work again in the new cluster. Reloading the big testing dataset, it took less than 1 minute to load the same dataset with only 24 H2O nodes. The load was also evenly distributed to the cluster. Problem solved!

Access Sparkling Water via R Studio

In the last few blogs, I discussed the following topics related to Sparkling Water.
Sparking Water Shell: Cloud size under 12 Exception
H2O vs Sparkling Water
The H2O Flow UI is nice, but need to some mouse clicks to get what you want. In this blog, I am going to discuss how to access Sparkling Water via R Studio. You can access the same H2O frame from both Sparkling Water and R Studio.

Data Preparation
Create the following store transaction file, store_trans.txt

[~]# cat store_trans.txt 
3000251,20171111,1321,Austin,TX,1234,Food,Pizza,34910,8.10,Angla
3000252,20171111,1321,Austin,TX,7812,Food,Milk,16920,3.99,Rosina
3000753,20171112,2010,Houston,TX,3190,Food,Pizza,34910,8.10,Broderick
3000954,20171112,1442,Austin,TX,1234,Food,Pizza,34910,8.10,Jeanne
3008255,20171112,1651,Austin,TX,5134,Sports,Shoes,12950,45.99,Angla
3026256,20171112,1632,Austin,TX,1234,Food,Pizza,34910,8.10,Jeanne

Upload the file to HDFS.

[~]# hdfs dfs -put store_trans.txt /user/wzhou/test/data/
[~]# hdfs dfs -ls /user/wzhou/test/data
Found 2 items
-rw-r--r--   3 wzhou supergroup       2495 2017-11-14 09:58 /user/wzhou/test/data/stock.csv
-rw-r--r--   3 wzhou supergroup        400 2017-11-16 04:50 /user/wzhou/test/data/store_trans.txt

Start Sparkling Shell
Run the following code to start sparkling shell. I used only 2 instance here and you could specify more instances and it will spread into multiple instances.

kinit wzhou
bin/sparkling-shell \
–master yarn \
–conf spark.executor.instances=2 \
–conf spark.executor.memory=1g \
–conf spark.driver.memory=1g \
–conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
–conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
–conf spark.dynamicAllocation.enabled=false \
–conf spark.sql.autoBroadcastJoinThreshold=-1 \
–conf spark.locality.wait=30000 \
–conf spark.scheduler.minRegisteredResourcesRatio=1

Here is the execution result.

[root@worker--908ba3bf-d4af-4db1-bc3e-f7a9ba5372fa sparkling-water-2.2.2]# bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.instances=2 \
> --conf spark.executor.memory=1g \
> --conf spark.driver.memory=1g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) : 
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

/usr/java/jdk1.8.0_131
lib_dir=/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/bin/../lib
bin_dir=/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/bin
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.142.0.8:4040
Spark context available as 'sc' (master = yarn, app id = application_1510827150310_0003).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

Start H2O Cluster
The H2O Cluster is running inside the Spark cluster.

import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)
import h2oContext._

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark) 
h2oContext: org.apache.spark.h2o.H2OContext =                                   

Sparkling Water Context:
 * H2O name: sparkling-water-root_application_1510827150310_0003
 * cluster size: 2
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (2,worker--c7e86f76-a1fa-49e5-9074-d7bef4dbb43e.c.cdh-director-173715.internal,54321)
  (1,worker--908ba3bf-d4af-4db1-bc3e-f7a9ba5372fa.c.cdh-director-173715.internal,54321)
  ------------------------
  Open H2O Flow in browser: http://10.142.0.8:54323 (CMD + click in Mac OSX)

scala> import h2oContext._ 
import h2oContext._

Access H2O Flow UI
Open a browser, and type in http://10.142.0.8:54323. Port 54321 is the default port number. The reason why it is 54323 is that I have run another one that is using 54321 right now.
Click importFiles

Find out the store_tran.txt file and click Import

Click Parse these files

You should see the following and then click Parse

Click View

Right now we have the nice store_trans.hex H2O Frame.

Run command getFrames and we should be able to see the store_trans.hex frame.

You can then do tons of stuff, like model building, predication and so on. I am not going to discuss more in this blog. At this moment, let me switch to R Studio.

Load Required Libraries in R Studio
Run the following:

options(rsparkling.sparklingwater.version = "2.2.2")
library(rsparkling)
library(sparklyr)
library(h2o)
library(dplyr)

Here is the result

> options(rsparkling.sparklingwater.version = "2.2.2")
> library(rsparkling)
> library(sparklyr)
> library(h2o)
----------------------------------------------------------------------
Your next step is to start H2O:
    > h2o.init()
For H2O package documentation, ask for help:
    > ??h2o
After starting H2O, you can use the Web UI at http://localhost:54321
For more information visit http://docs.h2o.ai

----------------------------------------------------------------------
Attaching package: ‘h2o’
The following objects are masked from ‘package:stats’:
    cor, sd, var
The following objects are masked from ‘package:base’:
    &&, %*%, %in%, ||, apply, as.factor, as.numeric, colnames, colnames<-, ifelse, is.character, is.factor, is.numeric, log, log10, log1p, log2, round, signif, trunc 
> library(dplyr)
Attaching package: ‘dplyr’
The following objects are masked from ‘package:stats’:
    filter, lag
The following objects are masked from ‘package:base’:
    intersect, setdiff, setequal, union

Connect to H2O Cluster
Do not run h2o.init() as the messages shown above. It will start a one node H2O cluster in your local node. I want to reuse the existing H2O cluster that is across multiple nodes. Run the following:
h2o.init(ip=”10.142.0.8″, port=54323)
h2o.clusterIsUp()

You should see the connection to the existing H2O cluster is successful. Also ignore version mismatch message.

Check cluster status and the status should be health in all nodes.

Let me see whether I can see the H2O frame I created from H2O Flow UI.

Excellent, I can see it. Let me do some operation against this frame.

Ok, we can see it is quite easy to access H2O from R Studio.

Sparking Water Shell: Cloud size under 12 Exception

In my last blog, I compared Sparking Water and H2O. Before I made Sparking-shell work, I run into a lot of issues. One of annoying errors was runtime exception: Cloud size under xx. Searched internet and found many people have the similar problems. There are many recommendations, ranging from downloading the latest and matching version, to set to certain parameters during startup. Unfortunately none of them were working for me. But finally I figured out the issue and would like to share my solution in this blog.

After I downloaded Sparking Water, unzipped the file, and run sparking-shell command as shown from http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/index.html. It looked good initially.

[sparkling-water-2.2.2]$ bin/sparkling-shell --conf "spark.executor.memory=1g"

-----
  Spark master (MASTER)     : local[*]
  Spark home   (SPARK_HOME) : /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.132.110.145:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1343).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

But when I run command val h2oContext = H2OContext.getOrCreate(spark), it gave me many errors as follows:

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Increasing 'spark.locality.wait' to value 30000
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Due to non-deterministic behavior of Spark broadcast-based joins
We recommend to disable them by configuring `spark.sql.autoBroadcastJoinThreshold` variable to value `-1`:
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
17/11/05 10:07:48 WARN internal.InternalH2OBackend: The property 'spark.scheduler.minRegisteredResourcesRatio' is not specified!
We recommend to pass `--conf spark.scheduler.minRegisteredResourcesRatio=1`
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Unsupported options spark.dynamicAllocation.enabled detected!
17/11/05 10:07:48 WARN internal.InternalH2OBackend:
The application is going down, since the parameter (spark.ext.h2o.fail.on.unsupported.spark.param,true) is true!
If you would like to skip the fail call, please, specify the value of the parameter to false.

java.lang.IllegalArgumentException: Unsupported argument: (spark.dynamicAllocation.enabled,true)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$$anonfun$checkUnsupportedSparkOptions$1.apply(InternalBackendUtils.scala:46)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$$anonfun$checkUnsupportedSparkOptions$1.apply(InternalBackendUtils.scala:38)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$class.checkUnsupportedSparkOptions(InternalBackendUtils.scala:38)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.checkUnsupportedSparkOptions(InternalH2OBackend.scala:30)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.checkAndUpdateConf(InternalH2OBackend.scala:60)
  at org.apache.spark.h2o.H2OContext.<init>(H2OContext.scala:90)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:355)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:383)
  ... 50 elided

You can see I need to pass in more parameters when starting sparking-shell. Change the parameters as follows:

bin/sparkling-shell \
--master yarn \
--conf spark.executor.memory=1g \
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
--conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.locality.wait=30000 \
--conf spark.scheduler.minRegisteredResourcesRatio=1

Ok, this time it looked better, at least warning messages disappeared. But got error message java.lang.RuntimeException: Cloud size under 2.

[sparkling-water-2.2.2]$ bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.memory=1g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) : /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.132.110.145:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1344).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
java.lang.RuntimeException: Cloud size under 2
  at water.H2O.waitForCloudSize(H2O.java:1689)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.init(InternalH2OBackend.scala:117)
  at org.apache.spark.h2o.H2OContext.init(H2OContext.scala:121)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:355)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:383)
  ... 50 elided

Not big deal. Anyway I didn’t specify the number of executors used and executor memory. It may complain about the size of the H2O cluster is too small. Add the following three parameters.

–conf spark.executor.instances=12 \
–conf spark.executor.memory=10g \
–conf spark.driver.memory=8g \

Rerun the whole thing got the same error with the size number changing to 12. It did not look right to me. Then I check out the H2O error logfile and found tons of messages as follows:

11-06 10:23:16.355 10.132.110.145:54325  30452  #09:54321 ERRR: Got IO error when sending batch UDP bytes: java.net.ConnectException: Connection refused
11-06 10:23:16.790 10.132.110.145:54325  30452  #06:54321 ERRR: Got IO error when sending batch UDP bytes: java.net.ConnectException: Connection refused

It looks like Sparking Water can not connect to Spark cluster. After some investigation, I then realized I installed and run sparking-shell from edge node on the BDA. If the H2O cluster was running inside a Spark application, the communication of Spark cluster on BDA is through BDA’s private network, or InfiniteBand network. Edge node can not directly communicate to IB network on BDA. With this assumption in mind, I installed and run Sparking Water on one of BDA nodes, it worked perfectly without any issue. Problem solved!

H2O vs Sparkling Water

People working in Hadoop environment are familiar with many products that make you feel like you’re in a zoo. For example, Pig, Hive, Beeswax, and ZooKeeper are some of them. As machine learning becomes more popular, products sounds like water came out, such as H2O and Sparking Water. I am not going to rule out the possibility we will see some big data products sound like wine. Anyway, people like call various new names to make their products sound cool, although many of them are similar.

In this blog, I am going to discuss H2O and Sparkling from high level.

H2O
H2O is a fast and open-source machine learning tool for big data analysis. It was launched in Silicon Valley in 2011 by a company called H2O.ai, formerly called Oxdata. The company is leaded by a few top data scientists in the world, and also backed by a few mathematical professors at Standford University on the company’s scientific advisory board.

H2O uses in-memory compression and can handles billions of rows in-memory. It allows companies to use all of their data without sampling to get predication faster. It includes built-in advanced algorithms such as deep learning, boosting, and bagging ensembles. It allows organizations to build powerful domain-specific
predictive engines for recommendations, customer churn, propensity to buy, dynamic pricing, and fraud detection for insurance and credit card companies.

H2O has an interface to R, Scala, Python, and Java. Not only it can be run on Hadoop and Cloud environment (AWS, Google Cloud, and Azure), but also can run on Linux, Mac, and Windows. The following shows the H2O architecture.

For your own testing, you could install H2O on your laptop. For big dataset or accessing Spark Clusters are installed somewhere, use Sparking Water, which I will discuss in the later part of the blog. The installation of H2O on your laptop is super easy. Here are the steps:
1. Download the H2O Zip File
Goto H2O download page at http://h2o.ai/download, choose Latest Stable Release under H2O block.

2. Run H2O
Just unzip the file and then run the jar file. It will start a web server on your local machine.

unzip h2o-3.14.0.7.zip
cd h2o-3.14.0.7
java -jar h2o.jar

3. Use H2O Flow UI
Input the link http://localhost:54321/flow/index.html in your browser and H2O Flow UI shows up as follows. You can do your data analysis work right now.

Sparking Water
Ok, let’s see what is Sparkling Water. In short, Sparking Water = H2O + Spark. Basically Sparking Water combines the fast machine learning algorithms of H2O with the popular in-memory platform – Spark to provide a fast and scalable solution for data analytics. Sparking Water supports Scala, R, or Python and can use H2O Flow UI to provide the machine learning platform for data scientists and application developers.

Sparking Water can run on the top of Spark in the following ways.

  • Local cluster
  • Standalone cluster
  • Spark cluster in a YARN environment

Sparking Water is designed as a Spark application. When it executes and you check from Spark UI, it is just a regular Spark Application. When it is launched, it first starts Spark Executors. Then H2O start services such as Key-Value store and memory manager inside executors. The following shows the relationship between Sparking Water, Spark and H2O.

To share data between Spark and H2O, Sparkling Water uses H2O’s H2OFrame. When converting an RDD/DataFrame to an H2O’s H2OFrame, it requires data duplication because it transfers data from RDD storage into H2OFrame. But data in H2OFrame is stored in compression format and does not need to be preserved in RDD.

A typical use case to use Sparking Water is to build Data Model. A model is constructed based on the estimation of metrics, testing data to give prediction that can be used in the rest of data pipeline.

The installation of Sparkling Water take a few more steps than H2O. As for now, Sparking Water is version 2.2.2. The detail installation steps are shown here at http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/index.html. However, I don’t like the installation instruction for the following reasons:

  • It uses local Spark cluster as example. As far as I know, local Spark is not a typical way to use Spark. The environment variable setting to point to local Spark cluster is confusing. The easiest step should verify that spark-shell is working or not. If it works, skip the step to install spark cluster or set SPARK related environment variables.
  • It gives a simple instruction to run sparking-shell –conf “spark.executor.memory=1g”. It misses a lot of other parameters. Otherwise, it will give you tons of warning messages.

Ok, here are the steps I believe are the correct steps:
1. Verify Your Spark Environment
Of course, someone should have Spark cluster up and running before even considering Sparking Water installation. Installing a new Spark cluster without knowing whether the cluster is working or not will be a nightmare to trace Sparking Water and H2O water issue if the issue comes from Spark Cluster. Also make sure to install Spark2. Sparking Water and H2O has a very strict rule in terms of the version compatibility among Spark, Sparking Water, and H2O. Here is a brief overview of the matrix.

For detail information please check out https://github.com/h2oai/rsparkling/blob/master/README.md

One easy way to verify spark cluster is good or not is to make sure you can access spark2-shell and run some simple example program without issues.

2. Download Sparking Water zip file
wget http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/sparkling-water-2.2.2.zip

3. Unzip the file and run

unzip sparkling-water-2.2.2.zip
cd sparkling-water-2.2.2

bin/sparkling-shell \
--master yarn \
--conf spark.executor.instances=12 \
--conf spark.executor.memory=10g \
--conf spark.driver.memory=8g \
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
--conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.locality.wait=30000 \
--conf spark.scheduler.minRegisteredResourcesRatio=1

The above sample code allows you to create H2O cluster based on 12 executors with 10GB memory for each executor. You also have to add bunch of parameters like spark.sql.autoBroadcastJoinThreshold, spark.dynamicAllocation.enabled and many more. Setting spark.dynamicAllocation.enabled to false makes sense as you probably want to have H2O cluster to use resource in a predicable way and not suck in all cluster memory when processing large amount of data. Adding these parameters can help you to avoid many annoying warning message when sparking-shell starts.

If in kerberosed environment, make sure to run kinit before executing sparking-shell.

4. Create an H2O cluster inside Spark Cluster
Run the following code.

import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark) 
import h2oContext._ 

The followings are the sample output from the run.

[install]$ cd sparkling-water-2.2.2
[sparkling-water-2.2.2]$ kinit wzhou
Password for wzhou@enkitec.com:
[sparkling-water-2.2.2]$ bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.instances=12 \
> --conf spark.executor.memory=10g \
> --conf spark.driver.memory=8g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) :
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/anaconda2/lib/python2.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/S                   taticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/St                   aticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/11/05 07:46:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes w                   here applicable
17/11/05 07:46:05 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cann                   ot be loaded.
17/11/05 07:46:06 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under                    SPARK_HOME.
Spark context Web UI available at http://192.168.10.45:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1362).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
h2oContext: org.apache.spark.h2o.H2OContext =

Sparkling Water Context:
 * H2O name: sparkling-water-wzhou_application_3608740912046_1362
 * cluster size: 12
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (8,enkbda1node07.enkitec.com,54321)
  (10,enkbda1node04.enkitec.com,54321)
  (6,enkbda1node08.enkitec.com,54321)
  (1,enkbda1node18.enkitec.com,54323)
  (9,enkbda1node18.enkitec.com,54321)
  (7,enkbda1node10.enkitec.com,54321)
  (4,enkbda1node17.enkitec.com,54321)
  (12,enkbda1node17.enkitec.com,54323)
  (3,enkbda1node16.enkitec.com,54323)
  (11,enkbda1node16.enkitec.com,54321)
  (2,enkbda1node04.enkitec.com,54323)
  (5,enkbda1node05.enkitec.com,54323)
  ------------------------

  Open H2O Flow in browser: http://192.168.10.45:54325 (CMD + ...
scala> import h2oContext._
import h2oContext._

You can see H2O cluster is indeed inside a Spark application.

5. Access H2O cluster
You can access the H2O cluster in many ways. One is using the H2O Flow UI. Exact the same UI I show you before, like http://192.168.10.45:54325.

Another way is to access it inside R Studio by using h2o.init(your_h2o_host, port). Use the above one as the example, here are the init command and other useful commands to check H2O cluster.

> library(sparklyr)
> library(h2o)
> library(dplyr)
> options(rsparkling.sparklingwater.version = "2.2.2")
> library(rsparkling)
> h2o.init(ip="enkbda1node05.enkitec.com", port=54325)
 Connection successful!

R is connected to the H2O cluster (in client mode): 
    H2O cluster uptime:         1 hours 7 minutes 
    H2O cluster version:        3.14.0.7 
    H2O cluster version age:    16 days  
    H2O cluster name:           sparkling-water-wzhou_application_3608740912046_1362 
    H2O cluster total nodes:    12 
    H2O cluster total memory:   93.30 GB 
    H2O cluster total cores:    384 
    H2O cluster allowed cores:  384 
    H2O cluster healthy:        TRUE 
    H2O Connection ip:          enkbda1node05.enkitec.com 
    H2O Connection port:        54325 
    H2O Connection proxy:       NA 
    H2O Internal Security:      FALSE 
    H2O API Extensions:         Algos, AutoML, Core V3, Core V4 
    R Version:                  R version 3.3.2 (2016-10-31) 

> h2o.clusterIsUp()
[1] TRUE


> h2o.clusterStatus()
Version: 3.14.0.7 
Cluster name: sparkling-water-wzhou_application_3608740912046_1362
Cluster size: 12 
Cluster is locked

                                             h2o healthy
1  enkbda1node04.enkitec.com/192.168.10.44:54321    TRUE
2  enkbda1node04.enkitec.com/192.168.10.44:54323    TRUE
3  enkbda1node05.enkitec.com/192.168.10.45:54323    TRUE
4  enkbda1node07.enkitec.com/192.168.10.47:54321    TRUE
5  enkbda1node08.enkitec.com/192.168.10.48:54321    TRUE
6  enkbda1node10.enkitec.com/192.168.10.50:54321    TRUE
7  enkbda1node16.enkitec.com/192.168.10.56:54321    TRUE
8  enkbda1node16.enkitec.com/192.168.10.56:54323    TRUE
9  enkbda1node17.enkitec.com/192.168.10.57:54321    TRUE
10 enkbda1node17.enkitec.com/192.168.10.57:54323    TRUE
11 enkbda1node18.enkitec.com/192.168.10.58:54321    TRUE
12 enkbda1node18.enkitec.com/192.168.10.58:54323    TRUE
      last_ping num_cpus sys_load mem_value_size   free_mem
1  1.509977e+12       32     1.21              0 8296753152
2  1.509977e+12       32     1.21              0 8615325696
3  1.509977e+12       32     0.63              0 8611375104
4  1.509977e+12       32     1.41              0 8290978816
5  1.509977e+12       32     0.33              0 8372216832
6  1.509977e+12       32     0.09              0 8164569088
7  1.509977e+12       32     0.23              0 8182391808
8  1.509977e+12       32     0.23              0 8579495936
9  1.509977e+12       32     0.15              0 8365195264
10 1.509977e+12       32     0.15              0 8131736576
11 1.509977e+12       32     0.63              0 8291118080
12 1.509977e+12       32     0.63              0 8243695616
     pojo_mem swap_mem    free_disk    max_disk   pid num_keys
1  1247908864        0 395950686208 4.91886e+11  4590        0
2   929336320        0 395950686208 4.91886e+11  4591        0
3   933286912        0 420010262528 4.91886e+11  3930        0
4  1253683200        0  4.25251e+11 4.91886e+11 16370        0
5  1172445184        0 425796304896 4.91886e+11 11998        0
6  1380092928        0 426025943040 4.91886e+11 20374        0
7  1362270208        0  4.21041e+11 4.91886e+11  4987        0
8   965166080        0  4.21041e+11 4.91886e+11  4988        0
9  1179466752        0 422286721024 4.91886e+11  6951        0
10 1412925440        0 422286721024 4.91886e+11  6952        0
11 1253543936        0 425969319936 4.91886e+11 24232        0
12 1300966400        0 425969319936 4.91886e+11 24233        0
   tcps_active open_fds rpcs_active
1            0      453           0
2            0      452           0
3            0      453           0
4            0      453           0
5            0      452           0
6            0      453           0
7            0      452           0
8            0      452           0
9            0      452           0
10           0      453           0
11           0      453           0
12           0      452           0

> h2o.networkTest()
Network Test: Launched from enkbda1node05.enkitec.com/192.168.10.45:54325
                                            destination
1                         all - collective bcast/reduce
2  remote enkbda1node04.enkitec.com/192.168.10.40:54321
3  remote enkbda1node04.enkitec.com/192.168.10.40:54323
4  remote enkbda1node05.enkitec.com/192.168.10.45:54323
5  remote enkbda1node07.enkitec.com/192.168.10.43:54321
6  remote enkbda1node08.enkitec.com/192.168.10.44:54321
7  remote enkbda1node10.enkitec.com/192.168.10.46:54321
8  remote enkbda1node16.enkitec.com/192.168.10.52:54321
9  remote enkbda1node16.enkitec.com/192.168.10.52:54323
10 remote enkbda1node17.enkitec.com/192.168.10.53:54321
11 remote enkbda1node17.enkitec.com/192.168.10.53:54323
12 remote enkbda1node18.enkitec.com/192.168.10.54:54321
13 remote enkbda1node18.enkitec.com/192.168.10.54:54323
                   1_bytes               1024_bytes
1   38.212 msec,  628  B/S     6.790 msec, 3.5 MB/S
2    4.929 msec,  405  B/S       752 usec, 2.6 MB/S
3    7.089 msec,  282  B/S       710 usec, 2.7 MB/S
4    5.687 msec,  351  B/S       634 usec, 3.1 MB/S
5    6.623 msec,  301  B/S       784 usec, 2.5 MB/S
6    6.277 msec,  318  B/S   2.680 msec, 746.0 KB/S
7    6.469 msec,  309  B/S       840 usec, 2.3 MB/S
8    6.595 msec,  303  B/S       801 usec, 2.4 MB/S
9    5.155 msec,  387  B/S       793 usec, 2.5 MB/S
10   5.204 msec,  384  B/S       703 usec, 2.8 MB/S
11   5.511 msec,  362  B/S       782 usec, 2.5 MB/S
12   6.784 msec,  294  B/S       927 usec, 2.1 MB/S
13   6.001 msec,  333  B/S       711 usec, 2.7 MB/S
               1048576_bytes
1   23.800 msec, 1008.4 MB/S
2     6.997 msec, 285.8 MB/S
3     5.576 msec, 358.6 MB/S
4     4.056 msec, 493.0 MB/S
5     5.066 msec, 394.8 MB/S
6     5.272 msec, 379.3 MB/S
7     5.176 msec, 386.3 MB/S
8     6.831 msec, 292.8 MB/S
9     5.772 msec, 346.4 MB/S
10    5.125 msec, 390.2 MB/S
11    5.274 msec, 379.2 MB/S
12    5.065 msec, 394.9 MB/S
13    4.960 msec, 403.2 MB/S