Running H2O Cluster in Background and at Specific Port Number

In a few of my previous blog, I discussed H2O vs Sparkling Water, Sparking Water Shell: Cloud size under 12 Exception, and Access Sparkling Water via R Studio.

When using H2O Sparkling Water, there are two common issues. The first one is that the default port number is 54321. However, if the H2O cluster has been bounced multiple times, the assigned port could be assigned to a different port number, actually next available port number, 54323. If the cluster is used by many data analysts, it is inconvenient to inform all of them every time the port number is changed. You want users remember only one port number.

The second issue is that the sparkling shell session can not be running in the background. If close the putty session running the sparkling shell, the H2O cluster is terminated.

This blog discusses the solution to work around the above two issues.

For port number, there are actually two parameters related: spark.ext.h2o.client.port.base and spark.ext.h2o.node.port.base. The spark.ext.h2o.client.port.base is the port number for H2O UI while spark.ext.h2o.node.port.base is the port used by H2O cluster internally for the communication among H2O nodes. Make sure these two port numbers should be different. Having these two are the same will cause issue. I also add spark.ext.h2o.cloud.name for the name of my H2O cluster.

I created two separate scripts: run_sparkling_shell.sh for the running command for sparkling shell and sparkling-shell-init.scala for starting up commands for H2O cluster in scala.

[wzhou@enkbda1node05 ~]$ cat run_sparkling_shell.sh
bin/sparkling-shell \
--master yarn \
--conf spark.ext.h2o.cloud.name=WeidongH2O-Cluster \
--conf spark.ext.h2o.client.port.base=26000 \
--conf spark.ext.h2o.node.port.base=26005 \
--conf spark.executor.instances=10 \
--conf spark.executor.memory=12g \
--conf spark.driver.memory=8g \
--conf spark.executor.cores=4 \
--conf spark.yarn.executor.memoryOverhead=4g \
--conf spark.yarn.driver.memoryOverhead=4g \
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
--conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.locality.wait=30000 \
--conf spark.yarn.queue=WZTestPool \
--conf spark.scheduler.minRegisteredResourcesRatio=1 \
-i  sparkling-shell-init.scala

The code for sparkling-shell-init.scala.

import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)
import h2oContext._

To execute sparkling-shell in background, my first try was to use nohup. It didn’t work. When calling sparkling-shell-init.scala script, it automatically adds :quit command at the end and terminate H2O cluster.

When I did work on Exadata, I used to use screen command a lot. It is a very useful tool for protecting long running critical job execution, like patching/upgrade and import/export. Therefore, I use the same trick in screen to help me to get around the background issue. Here are the steps.

1. Start screen session
Use screen -ls command to check whether I have screen available.

[wzhou@enkbda1node05 ~]$ screen -ls
No Sockets found in /var/run/screen/S-wzhou.

Start a screen session.
[wzhou@enkbda1node05 ~]$ screen

2. Start H2O Cluster
Run the script to start H2O cluster.

[wzhou@enkbda1node05 ~]$ ./run_sparkling_shell.sh

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) :
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/anaconda2/lib/python2.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/09 10:12:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/09 10:12:58 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
17/12/09 10:12:59 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://192.168.10.14:4040
Spark context available as 'sc' (master = yarn, app id = application_2567590118914_1007).
Spark session available as 'spark'.
Loading sparkling-shell-init.scala...
import org.apache.spark.h2o._
h2oContext: org.apache.spark.h2o.H2OContext =

Sparkling Water Context:
 * H2O name: WeidongH2O-Cluster
 * cluster size: 10
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (2,enkbda1node08.enkitec.com,26005)
  (4,enkbda1node12.enkitec.com,26005)
  (9,enkbda1node17.enkitec.com,26005)
  (5,enkbda1node13.enkitec.com,26005)
  (7,enkbda1node15.enkitec.com,26005)
  (1,enkbda1node09.enkitec.com,26005)
  (8,enkbda1node04.enkitec.com,26005)
  (6,enkbda1node10.enkitec.com,26005)
  (3,enkbda1node11.enkitec.com,26005)
  (10,enkbda1node05.enkitec.com,26005)
  ------------------------

  Open H2O Flow in browser: http://192.168.10.14:26000 (CMD + click in Mac OSX)


import h2oContext._

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

3. Verify H2O Cluster
Right now, close the session. Open a new session. Check out the existing screen session.

[wzhou@enkbda1node05 ~]$ screen -ls
There is a screen on:
        19044.pts-0.enkbda1node05       (Detached)
1 Socket in /var/run/screen/S-wzhou.

Now, attach to the existing session
[wzhou@enkbda1node05 ~]$ screen -x 19044

You should see the original session that was running sparking shell. The UI is still working as expected.

Access Sparkling Water via R Studio

In the last few blogs, I discussed the following topics related to Sparkling Water.
Sparking Water Shell: Cloud size under 12 Exception
H2O vs Sparkling Water
The H2O Flow UI is nice, but need to some mouse clicks to get what you want. In this blog, I am going to discuss how to access Sparkling Water via R Studio. You can access the same H2O frame from both Sparkling Water and R Studio.

Data Preparation
Create the following store transaction file, store_trans.txt

[~]# cat store_trans.txt 
3000251,20171111,1321,Austin,TX,1234,Food,Pizza,34910,8.10,Angla
3000252,20171111,1321,Austin,TX,7812,Food,Milk,16920,3.99,Rosina
3000753,20171112,2010,Houston,TX,3190,Food,Pizza,34910,8.10,Broderick
3000954,20171112,1442,Austin,TX,1234,Food,Pizza,34910,8.10,Jeanne
3008255,20171112,1651,Austin,TX,5134,Sports,Shoes,12950,45.99,Angla
3026256,20171112,1632,Austin,TX,1234,Food,Pizza,34910,8.10,Jeanne

Upload the file to HDFS.

[~]# hdfs dfs -put store_trans.txt /user/wzhou/test/data/
[~]# hdfs dfs -ls /user/wzhou/test/data
Found 2 items
-rw-r--r--   3 wzhou supergroup       2495 2017-11-14 09:58 /user/wzhou/test/data/stock.csv
-rw-r--r--   3 wzhou supergroup        400 2017-11-16 04:50 /user/wzhou/test/data/store_trans.txt

Start Sparkling Shell
Run the following code to start sparkling shell. I used only 2 instance here and you could specify more instances and it will spread into multiple instances.

kinit wzhou
bin/sparkling-shell \
–master yarn \
–conf spark.executor.instances=2 \
–conf spark.executor.memory=1g \
–conf spark.driver.memory=1g \
–conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
–conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
–conf spark.dynamicAllocation.enabled=false \
–conf spark.sql.autoBroadcastJoinThreshold=-1 \
–conf spark.locality.wait=30000 \
–conf spark.scheduler.minRegisteredResourcesRatio=1

Here is the execution result.

[root@worker--908ba3bf-d4af-4db1-bc3e-f7a9ba5372fa sparkling-water-2.2.2]# bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.instances=2 \
> --conf spark.executor.memory=1g \
> --conf spark.driver.memory=1g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) : 
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

/usr/java/jdk1.8.0_131
lib_dir=/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/bin/../lib
bin_dir=/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/bin
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.142.0.8:4040
Spark context available as 'sc' (master = yarn, app id = application_1510827150310_0003).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

Start H2O Cluster
The H2O Cluster is running inside the Spark cluster.

import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)
import h2oContext._

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark) 
h2oContext: org.apache.spark.h2o.H2OContext =                                   

Sparkling Water Context:
 * H2O name: sparkling-water-root_application_1510827150310_0003
 * cluster size: 2
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (2,worker--c7e86f76-a1fa-49e5-9074-d7bef4dbb43e.c.cdh-director-173715.internal,54321)
  (1,worker--908ba3bf-d4af-4db1-bc3e-f7a9ba5372fa.c.cdh-director-173715.internal,54321)
  ------------------------
  Open H2O Flow in browser: http://10.142.0.8:54323 (CMD + click in Mac OSX)

scala> import h2oContext._ 
import h2oContext._

Access H2O Flow UI
Open a browser, and type in http://10.142.0.8:54323. Port 54321 is the default port number. The reason why it is 54323 is that I have run another one that is using 54321 right now.
Click importFiles

Find out the store_tran.txt file and click Import

Click Parse these files

You should see the following and then click Parse

Click View

Right now we have the nice store_trans.hex H2O Frame.

Run command getFrames and we should be able to see the store_trans.hex frame.

You can then do tons of stuff, like model building, predication and so on. I am not going to discuss more in this blog. At this moment, let me switch to R Studio.

Load Required Libraries in R Studio
Run the following:

options(rsparkling.sparklingwater.version = "2.2.2")
library(rsparkling)
library(sparklyr)
library(h2o)
library(dplyr)

Here is the result

> options(rsparkling.sparklingwater.version = "2.2.2")
> library(rsparkling)
> library(sparklyr)
> library(h2o)
----------------------------------------------------------------------
Your next step is to start H2O:
    > h2o.init()
For H2O package documentation, ask for help:
    > ??h2o
After starting H2O, you can use the Web UI at http://localhost:54321
For more information visit http://docs.h2o.ai

----------------------------------------------------------------------
Attaching package: ‘h2o’
The following objects are masked from ‘package:stats’:
    cor, sd, var
The following objects are masked from ‘package:base’:
    &&, %*%, %in%, ||, apply, as.factor, as.numeric, colnames, colnames<-, ifelse, is.character, is.factor, is.numeric, log, log10, log1p, log2, round, signif, trunc 
> library(dplyr)
Attaching package: ‘dplyr’
The following objects are masked from ‘package:stats’:
    filter, lag
The following objects are masked from ‘package:base’:
    intersect, setdiff, setequal, union

Connect to H2O Cluster
Do not run h2o.init() as the messages shown above. It will start a one node H2O cluster in your local node. I want to reuse the existing H2O cluster that is across multiple nodes. Run the following:
h2o.init(ip=”10.142.0.8″, port=54323)
h2o.clusterIsUp()

You should see the connection to the existing H2O cluster is successful. Also ignore version mismatch message.

Check cluster status and the status should be health in all nodes.

Let me see whether I can see the H2O frame I created from H2O Flow UI.

Excellent, I can see it. Let me do some operation against this frame.

Ok, we can see it is quite easy to access H2O from R Studio.

Sparking Water Shell: Cloud size under 12 Exception

In my last blog, I compared Sparking Water and H2O. Before I made Sparking-shell work, I run into a lot of issues. One of annoying errors was runtime exception: Cloud size under xx. Searched internet and found many people have the similar problems. There are many recommendations, ranging from downloading the latest and matching version, to set to certain parameters during startup. Unfortunately none of them were working for me. But finally I figured out the issue and would like to share my solution in this blog.

After I downloaded Sparking Water, unzipped the file, and run sparking-shell command as shown from http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/index.html. It looked good initially.

[sparkling-water-2.2.2]$ bin/sparkling-shell --conf "spark.executor.memory=1g"

-----
  Spark master (MASTER)     : local[*]
  Spark home   (SPARK_HOME) : /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.132.110.145:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1343).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

But when I run command val h2oContext = H2OContext.getOrCreate(spark), it gave me many errors as follows:

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Increasing 'spark.locality.wait' to value 30000
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Due to non-deterministic behavior of Spark broadcast-based joins
We recommend to disable them by configuring `spark.sql.autoBroadcastJoinThreshold` variable to value `-1`:
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
17/11/05 10:07:48 WARN internal.InternalH2OBackend: The property 'spark.scheduler.minRegisteredResourcesRatio' is not specified!
We recommend to pass `--conf spark.scheduler.minRegisteredResourcesRatio=1`
17/11/05 10:07:48 WARN internal.InternalH2OBackend: Unsupported options spark.dynamicAllocation.enabled detected!
17/11/05 10:07:48 WARN internal.InternalH2OBackend:
The application is going down, since the parameter (spark.ext.h2o.fail.on.unsupported.spark.param,true) is true!
If you would like to skip the fail call, please, specify the value of the parameter to false.

java.lang.IllegalArgumentException: Unsupported argument: (spark.dynamicAllocation.enabled,true)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$$anonfun$checkUnsupportedSparkOptions$1.apply(InternalBackendUtils.scala:46)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$$anonfun$checkUnsupportedSparkOptions$1.apply(InternalBackendUtils.scala:38)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$class.checkUnsupportedSparkOptions(InternalBackendUtils.scala:38)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.checkUnsupportedSparkOptions(InternalH2OBackend.scala:30)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.checkAndUpdateConf(InternalH2OBackend.scala:60)
  at org.apache.spark.h2o.H2OContext.<init>(H2OContext.scala:90)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:355)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:383)
  ... 50 elided

You can see I need to pass in more parameters when starting sparking-shell. Change the parameters as follows:

bin/sparkling-shell \
--master yarn \
--conf spark.executor.memory=1g \
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
--conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.locality.wait=30000 \
--conf spark.scheduler.minRegisteredResourcesRatio=1

Ok, this time it looked better, at least warning messages disappeared. But got error message java.lang.RuntimeException: Cloud size under 2.

[sparkling-water-2.2.2]$ bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.memory=1g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) : /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.132.110.145:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1344).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
java.lang.RuntimeException: Cloud size under 2
  at water.H2O.waitForCloudSize(H2O.java:1689)
  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.init(InternalH2OBackend.scala:117)
  at org.apache.spark.h2o.H2OContext.init(H2OContext.scala:121)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:355)
  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:383)
  ... 50 elided

Not big deal. Anyway I didn’t specify the number of executors used and executor memory. It may complain about the size of the H2O cluster is too small. Add the following three parameters.

–conf spark.executor.instances=12 \
–conf spark.executor.memory=10g \
–conf spark.driver.memory=8g \

Rerun the whole thing got the same error with the size number changing to 12. It did not look right to me. Then I check out the H2O error logfile and found tons of messages as follows:

11-06 10:23:16.355 10.132.110.145:54325  30452  #09:54321 ERRR: Got IO error when sending batch UDP bytes: java.net.ConnectException: Connection refused
11-06 10:23:16.790 10.132.110.145:54325  30452  #06:54321 ERRR: Got IO error when sending batch UDP bytes: java.net.ConnectException: Connection refused

It looks like Sparking Water can not connect to Spark cluster. After some investigation, I then realized I installed and run sparking-shell from edge node on the BDA. If the H2O cluster was running inside a Spark application, the communication of Spark cluster on BDA is through BDA’s private network, or InfiniteBand network. Edge node can not directly communicate to IB network on BDA. With this assumption in mind, I installed and run Sparking Water on one of BDA nodes, it worked perfectly without any issue. Problem solved!

H2O vs Sparkling Water

People working in Hadoop environment are familiar with many products that make you feel like you’re in a zoo. For example, Pig, Hive, Beeswax, and ZooKeeper are some of them. As machine learning becomes more popular, products sounds like water came out, such as H2O and Sparking Water. I am not going to rule out the possibility we will see some big data products sound like wine. Anyway, people like call various new names to make their products sound cool, although many of them are similar.

In this blog, I am going to discuss H2O and Sparkling from high level.

H2O
H2O is a fast and open-source machine learning tool for big data analysis. It was launched in Silicon Valley in 2011 by a company called H2O.ai, formerly called Oxdata. The company is leaded by a few top data scientists in the world, and also backed by a few mathematical professors at Standford University on the company’s scientific advisory board.

H2O uses in-memory compression and can handles billions of rows in-memory. It allows companies to use all of their data without sampling to get predication faster. It includes built-in advanced algorithms such as deep learning, boosting, and bagging ensembles. It allows organizations to build powerful domain-specific
predictive engines for recommendations, customer churn, propensity to buy, dynamic pricing, and fraud detection for insurance and credit card companies.

H2O has an interface to R, Scala, Python, and Java. Not only it can be run on Hadoop and Cloud environment (AWS, Google Cloud, and Azure), but also can run on Linux, Mac, and Windows. The following shows the H2O architecture.

For your own testing, you could install H2O on your laptop. For big dataset or accessing Spark Clusters are installed somewhere, use Sparking Water, which I will discuss in the later part of the blog. The installation of H2O on your laptop is super easy. Here are the steps:
1. Download the H2O Zip File
Goto H2O download page at http://h2o.ai/download, choose Latest Stable Release under H2O block.

2. Run H2O
Just unzip the file and then run the jar file. It will start a web server on your local machine.

unzip h2o-3.14.0.7.zip
cd h2o-3.14.0.7
java -jar h2o.jar

3. Use H2O Flow UI
Input the link http://localhost:54321/flow/index.html in your browser and H2O Flow UI shows up as follows. You can do your data analysis work right now.

Sparking Water
Ok, let’s see what is Sparkling Water. In short, Sparking Water = H2O + Spark. Basically Sparking Water combines the fast machine learning algorithms of H2O with the popular in-memory platform – Spark to provide a fast and scalable solution for data analytics. Sparking Water supports Scala, R, or Python and can use H2O Flow UI to provide the machine learning platform for data scientists and application developers.

Sparking Water can run on the top of Spark in the following ways.

  • Local cluster
  • Standalone cluster
  • Spark cluster in a YARN environment

Sparking Water is designed as a Spark application. When it executes and you check from Spark UI, it is just a regular Spark Application. When it is launched, it first starts Spark Executors. Then H2O start services such as Key-Value store and memory manager inside executors. The following shows the relationship between Sparking Water, Spark and H2O.

To share data between Spark and H2O, Sparkling Water uses H2O’s H2OFrame. When converting an RDD/DataFrame to an H2O’s H2OFrame, it requires data duplication because it transfers data from RDD storage into H2OFrame. But data in H2OFrame is stored in compression format and does not need to be preserved in RDD.

A typical use case to use Sparking Water is to build Data Model. A model is constructed based on the estimation of metrics, testing data to give prediction that can be used in the rest of data pipeline.

The installation of Sparkling Water take a few more steps than H2O. As for now, Sparking Water is version 2.2.2. The detail installation steps are shown here at http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/index.html. However, I don’t like the installation instruction for the following reasons:

  • It uses local Spark cluster as example. As far as I know, local Spark is not a typical way to use Spark. The environment variable setting to point to local Spark cluster is confusing. The easiest step should verify that spark-shell is working or not. If it works, skip the step to install spark cluster or set SPARK related environment variables.
  • It gives a simple instruction to run sparking-shell –conf “spark.executor.memory=1g”. It misses a lot of other parameters. Otherwise, it will give you tons of warning messages.

Ok, here are the steps I believe are the correct steps:
1. Verify Your Spark Environment
Of course, someone should have Spark cluster up and running before even considering Sparking Water installation. Installing a new Spark cluster without knowing whether the cluster is working or not will be a nightmare to trace Sparking Water and H2O water issue if the issue comes from Spark Cluster. Also make sure to install Spark2. Sparking Water and H2O has a very strict rule in terms of the version compatibility among Spark, Sparking Water, and H2O. Here is a brief overview of the matrix.

For detail information please check out https://github.com/h2oai/rsparkling/blob/master/README.md

One easy way to verify spark cluster is good or not is to make sure you can access spark2-shell and run some simple example program without issues.

2. Download Sparking Water zip file
wget http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.2/2/sparkling-water-2.2.2.zip

3. Unzip the file and run

unzip sparkling-water-2.2.2.zip
cd sparkling-water-2.2.2

bin/sparkling-shell \
--master yarn \
--conf spark.executor.instances=12 \
--conf spark.executor.memory=10g \
--conf spark.driver.memory=8g \
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
--conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.locality.wait=30000 \
--conf spark.scheduler.minRegisteredResourcesRatio=1

The above sample code allows you to create H2O cluster based on 12 executors with 10GB memory for each executor. You also have to add bunch of parameters like spark.sql.autoBroadcastJoinThreshold, spark.dynamicAllocation.enabled and many more. Setting spark.dynamicAllocation.enabled to false makes sense as you probably want to have H2O cluster to use resource in a predicable way and not suck in all cluster memory when processing large amount of data. Adding these parameters can help you to avoid many annoying warning message when sparking-shell starts.

If in kerberosed environment, make sure to run kinit before executing sparking-shell.

4. Create an H2O cluster inside Spark Cluster
Run the following code.

import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark) 
import h2oContext._ 

The followings are the sample output from the run.

[install]$ cd sparkling-water-2.2.2
[sparkling-water-2.2.2]$ kinit wzhou
Password for wzhou@enkitec.com:
[sparkling-water-2.2.2]$ bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.instances=12 \
> --conf spark.executor.memory=10g \
> --conf spark.driver.memory=8g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) :
  H2O build version         : 3.14.0.7 (weierstrass)
  Spark build version       : 2.2.0
  Scala version             : 2.11
----

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/anaconda2/lib/python2.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/S                   taticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/St                   aticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/11/05 07:46:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes w                   here applicable
17/11/05 07:46:05 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cann                   ot be loaded.
17/11/05 07:46:06 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under                    SPARK_HOME.
Spark context Web UI available at http://192.168.10.45:4040
Spark context available as 'sc' (master = yarn, app id = application_3608740912046_1362).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark)
h2oContext: org.apache.spark.h2o.H2OContext =

Sparkling Water Context:
 * H2O name: sparkling-water-wzhou_application_3608740912046_1362
 * cluster size: 12
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (8,enkbda1node07.enkitec.com,54321)
  (10,enkbda1node04.enkitec.com,54321)
  (6,enkbda1node08.enkitec.com,54321)
  (1,enkbda1node18.enkitec.com,54323)
  (9,enkbda1node18.enkitec.com,54321)
  (7,enkbda1node10.enkitec.com,54321)
  (4,enkbda1node17.enkitec.com,54321)
  (12,enkbda1node17.enkitec.com,54323)
  (3,enkbda1node16.enkitec.com,54323)
  (11,enkbda1node16.enkitec.com,54321)
  (2,enkbda1node04.enkitec.com,54323)
  (5,enkbda1node05.enkitec.com,54323)
  ------------------------

  Open H2O Flow in browser: http://192.168.10.45:54325 (CMD + ...
scala> import h2oContext._
import h2oContext._

You can see H2O cluster is indeed inside a Spark application.

5. Access H2O cluster
You can access the H2O cluster in many ways. One is using the H2O Flow UI. Exact the same UI I show you before, like http://192.168.10.45:54325.

Another way is to access it inside R Studio by using h2o.init(your_h2o_host, port). Use the above one as the example, here are the init command and other useful commands to check H2O cluster.

> library(sparklyr)
> library(h2o)
> library(dplyr)
> options(rsparkling.sparklingwater.version = "2.2.2")
> library(rsparkling)
> h2o.init(ip="enkbda1node05.enkitec.com", port=54325)
 Connection successful!

R is connected to the H2O cluster (in client mode): 
    H2O cluster uptime:         1 hours 7 minutes 
    H2O cluster version:        3.14.0.7 
    H2O cluster version age:    16 days  
    H2O cluster name:           sparkling-water-wzhou_application_3608740912046_1362 
    H2O cluster total nodes:    12 
    H2O cluster total memory:   93.30 GB 
    H2O cluster total cores:    384 
    H2O cluster allowed cores:  384 
    H2O cluster healthy:        TRUE 
    H2O Connection ip:          enkbda1node05.enkitec.com 
    H2O Connection port:        54325 
    H2O Connection proxy:       NA 
    H2O Internal Security:      FALSE 
    H2O API Extensions:         Algos, AutoML, Core V3, Core V4 
    R Version:                  R version 3.3.2 (2016-10-31) 

> h2o.clusterIsUp()
[1] TRUE


> h2o.clusterStatus()
Version: 3.14.0.7 
Cluster name: sparkling-water-wzhou_application_3608740912046_1362
Cluster size: 12 
Cluster is locked

                                             h2o healthy
1  enkbda1node04.enkitec.com/192.168.10.44:54321    TRUE
2  enkbda1node04.enkitec.com/192.168.10.44:54323    TRUE
3  enkbda1node05.enkitec.com/192.168.10.45:54323    TRUE
4  enkbda1node07.enkitec.com/192.168.10.47:54321    TRUE
5  enkbda1node08.enkitec.com/192.168.10.48:54321    TRUE
6  enkbda1node10.enkitec.com/192.168.10.50:54321    TRUE
7  enkbda1node16.enkitec.com/192.168.10.56:54321    TRUE
8  enkbda1node16.enkitec.com/192.168.10.56:54323    TRUE
9  enkbda1node17.enkitec.com/192.168.10.57:54321    TRUE
10 enkbda1node17.enkitec.com/192.168.10.57:54323    TRUE
11 enkbda1node18.enkitec.com/192.168.10.58:54321    TRUE
12 enkbda1node18.enkitec.com/192.168.10.58:54323    TRUE
      last_ping num_cpus sys_load mem_value_size   free_mem
1  1.509977e+12       32     1.21              0 8296753152
2  1.509977e+12       32     1.21              0 8615325696
3  1.509977e+12       32     0.63              0 8611375104
4  1.509977e+12       32     1.41              0 8290978816
5  1.509977e+12       32     0.33              0 8372216832
6  1.509977e+12       32     0.09              0 8164569088
7  1.509977e+12       32     0.23              0 8182391808
8  1.509977e+12       32     0.23              0 8579495936
9  1.509977e+12       32     0.15              0 8365195264
10 1.509977e+12       32     0.15              0 8131736576
11 1.509977e+12       32     0.63              0 8291118080
12 1.509977e+12       32     0.63              0 8243695616
     pojo_mem swap_mem    free_disk    max_disk   pid num_keys
1  1247908864        0 395950686208 4.91886e+11  4590        0
2   929336320        0 395950686208 4.91886e+11  4591        0
3   933286912        0 420010262528 4.91886e+11  3930        0
4  1253683200        0  4.25251e+11 4.91886e+11 16370        0
5  1172445184        0 425796304896 4.91886e+11 11998        0
6  1380092928        0 426025943040 4.91886e+11 20374        0
7  1362270208        0  4.21041e+11 4.91886e+11  4987        0
8   965166080        0  4.21041e+11 4.91886e+11  4988        0
9  1179466752        0 422286721024 4.91886e+11  6951        0
10 1412925440        0 422286721024 4.91886e+11  6952        0
11 1253543936        0 425969319936 4.91886e+11 24232        0
12 1300966400        0 425969319936 4.91886e+11 24233        0
   tcps_active open_fds rpcs_active
1            0      453           0
2            0      452           0
3            0      453           0
4            0      453           0
5            0      452           0
6            0      453           0
7            0      452           0
8            0      452           0
9            0      452           0
10           0      453           0
11           0      453           0
12           0      452           0

> h2o.networkTest()
Network Test: Launched from enkbda1node05.enkitec.com/192.168.10.45:54325
                                            destination
1                         all - collective bcast/reduce
2  remote enkbda1node04.enkitec.com/192.168.10.40:54321
3  remote enkbda1node04.enkitec.com/192.168.10.40:54323
4  remote enkbda1node05.enkitec.com/192.168.10.45:54323
5  remote enkbda1node07.enkitec.com/192.168.10.43:54321
6  remote enkbda1node08.enkitec.com/192.168.10.44:54321
7  remote enkbda1node10.enkitec.com/192.168.10.46:54321
8  remote enkbda1node16.enkitec.com/192.168.10.52:54321
9  remote enkbda1node16.enkitec.com/192.168.10.52:54323
10 remote enkbda1node17.enkitec.com/192.168.10.53:54321
11 remote enkbda1node17.enkitec.com/192.168.10.53:54323
12 remote enkbda1node18.enkitec.com/192.168.10.54:54321
13 remote enkbda1node18.enkitec.com/192.168.10.54:54323
                   1_bytes               1024_bytes
1   38.212 msec,  628  B/S     6.790 msec, 3.5 MB/S
2    4.929 msec,  405  B/S       752 usec, 2.6 MB/S
3    7.089 msec,  282  B/S       710 usec, 2.7 MB/S
4    5.687 msec,  351  B/S       634 usec, 3.1 MB/S
5    6.623 msec,  301  B/S       784 usec, 2.5 MB/S
6    6.277 msec,  318  B/S   2.680 msec, 746.0 KB/S
7    6.469 msec,  309  B/S       840 usec, 2.3 MB/S
8    6.595 msec,  303  B/S       801 usec, 2.4 MB/S
9    5.155 msec,  387  B/S       793 usec, 2.5 MB/S
10   5.204 msec,  384  B/S       703 usec, 2.8 MB/S
11   5.511 msec,  362  B/S       782 usec, 2.5 MB/S
12   6.784 msec,  294  B/S       927 usec, 2.1 MB/S
13   6.001 msec,  333  B/S       711 usec, 2.7 MB/S
               1048576_bytes
1   23.800 msec, 1008.4 MB/S
2     6.997 msec, 285.8 MB/S
3     5.576 msec, 358.6 MB/S
4     4.056 msec, 493.0 MB/S
5     5.066 msec, 394.8 MB/S
6     5.272 msec, 379.3 MB/S
7     5.176 msec, 386.3 MB/S
8     6.831 msec, 292.8 MB/S
9     5.772 msec, 346.4 MB/S
10    5.125 msec, 390.2 MB/S
11    5.274 msec, 379.2 MB/S
12    5.065 msec, 394.9 MB/S
13    4.960 msec, 403.2 MB/S

Resolve Sparklyr not Respond Issue on Port 8880

Recently I was approached by one of my clients to help them to investigate a weird Sparklyr issue. sparklyr is an interface between R and Spark introduced by RStudio about a years ago. The following is the the sparklyr architecture.

When trying to do sc <- spark_connect in RStudio, we got two errors as follows:

  • Failed while connecting to sparklyr to port (8880) for sessionid (3859): Gateway in port (8880) did not respond.
  • Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
  • Here is the detail message.

    > library(sparklyr)
    > library(dplyr) 
    > sc <- spark_connect(master = "yarn-client", config=spark_config(), version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
     
    Error in force(code) :
    Failed while connecting to sparklyr to port (8880) for sessionid (3859): Gateway in port (8880) did not respond.
    Path: /opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/lib/spark/bin/spark-submit
    Parameters: --class, sparklyr.Shell, --jars, '/usr/lib64/R/library/sparklyr/java/spark-csv_2.11-1.3.0.jar','/usr/lib64/R/library/sparklyr/java/commons-csv-1.1.jar','/usr/lib64/R/library/sparklyr/java/univocity-parsers-1.5.1.jar', '/usr/lib64/R/library/sparklyr/java/sparklyr-1.6-2.10.jar', 8880, 3859
    Log: /tmp/RtmzpSIMln/file9e23246605df7_spark.log
     
    ....
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
                   at org.apache.spark.deploy.SparkSubmitArguments.handle(SparkSubmitArguments.scala:394)
                   at org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:163)
                   at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:97)
                   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:114)
                   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream
                   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
                   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
                   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                   ... 5 more
    

    Did some research and found many people having the similar issue. Ok, try their recommendations one by one as follows.

  • Set SPARK_HOME environment
  • Try run Sys.setEnv(SPARK_HOME = “/opt/cloudera/parcels/CDH/lib/spark/”). No, not working.

  • Install latest version sparklyr
  • My client installed sparklyr less than one month ago. I don’t see why this option makes sense. Don’t even pursue this path.

  • Check Java Installation
  • The R on the same server uses the same version of Java without any issue. I don’t see why Java installation become a major concern here. Ignore this one.

  • No Hadoop Installation
  • Someone said just Spark installation is not enough, not to have Hadoop Installation as well. Clearly it does not fit our situation. The server is an edge node and has hadoop installation.

  • Do not have a valid kerberos ticket
  • Running system2(‘klist’) does show no kerberos ticket. Ok, I then open up a shell within RStudio Server by clicking tools -> shell, then issuing the kinit command.
    Rerun system2(‘klist’) shows I have a valid kerberos ticket. Try again. still not working.
    Note: even it is not working, this step is necessary for further action when the issue is fixed. So still need to run this one no matter what the result is.

  • Create a different configure and pass to spark_connect
  • Someone recommended to create a new configure and pass it in. It looks like a good idea. Unfortunately, just doesn’t work.

    wzconfig <- spark_config()
    wzconfig$`sparklyr.shell.deploy-mode` <- "client"
    wzconfig$spark.driver.cores <- 1
    wzconfig$spark.executor.cores <- 2
    wzconfig$spark.executor.memory <- "4G"
    sc <- spark_connect(master = "yarn-client", config=wzconfig, version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
    

    Actually this recommendation is missing another key parameter. By default the total number of executors launched is 2. I would usually bump up this number a little to get a better performance. You can use the following way to set up the
    total number of executors.

    wzconfig$spark.executor.instances <- 3
    

    Although this approach looks promising, still not working. But this approach is definitely a way to use for other purpose to better control the Spark resource usage.

  • Add remote address
  • Someone mentioned to set remote address. I thought this could another potential option as I resolved issues in Spark related to local IP issue in the past. So I add the following code in the configuration from the previous example, note parameter sparklyr.gateway.address is the hostname of active Resource Manager.

    wzconfig$sparklyr.gateway.remote <- TRUE
    wzconfig$sparklyr.gateway.address <- "cdhcluster01n03.mycompany.com" 
    

    Not working for this case.

  • Change deployment mode to yarn-cluster
  • This is probably the most unrealistic one. If connect as with master = “yarn-cluster”, the spark driver will be somewhere inside the Spark cluster. For our current case, I don’t believe this is the right solution. Don’t even try it.

  • Run Spark example
  • Someone recommended to run a spark-submit to verify SparkPi can be run from the environment. This looks reasonable. The good thing I figured out the issue before executing this one. But this definitely a valid and good test to verify spark-submit.

    /opt/cloudera/parcels/SPARK2/lib/spark2/bin/spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn /opt/cloudera/parcels/SPARK2/lib/spark2/examples/jars/spark-examples_2.11-2.1.0.jar 10
    
  • HA for yarn-cluster
  • There is an interesting post Add support for `yarn-cluster` with high availability #905 discussing about the issue might relate to multiple resource managers. We use HA and this post is an interesting one. But might not fit into our case because I feel we have not reached to the HA part yet with Class Not Found message.

  • Need to set JAVA_HOME
  • Verified it and we have it. So this is not the issue.

  • My Solution
  • After reviewing or trying out some of above solutions, I like to go back my way of thinking. I must say I am not an expert in R or RStudio with very limited knowledge about how it works. But I did have extensive background in Spark tuning and trouble shooting.

    I know the error message Gateway in port (8880) did not respond is always the first message shows up and looks like the cause of the issue. But I thought differently. I believe the 2nd error NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream looks more suspicious than the first one. Early this year I helped one of another clients on a weird Spark job issue, which is in the end, was caused by the incorrect path. It seems to me the path might not be right and cause Spark issue, then caused the first error of port not respond.

    With this idea in mind, I focused more the path verification. Run the command Sys.getenv() to get the environment as follows.

    > Sys.getenv()
    DISPLAY                 :0
    EDITOR                  vi
    GIT_ASKPASS             rpostback-askpass
    HADOOP_CONF_DIR         /etc/hadoop/conf.cloudera.hdfs
    HADOOP_HOME             /opt/cloudera/parcels/CDH
    HOME                    /home/wzhou
    JAVA_HOME               /usr/java/jdk1.8.0_144/jre
    LANG                    en_US.UTF-8
    LD_LIBRARY_PATH         /usr/lib64/R/lib::/lib:/usr/java/jdk1.8.0_92/jre/lib/amd64/server
    LN_S                    ln -s
    LOGNAME                 wzhou
    MAKE                    make
    PAGER                   /usr/bin/less
    PATH                    /usr/local/sbin:/usr/local/bin:/usr/bin:/usr/sbin:/sbin:/bin
    R_BROWSER               /usr/bin/xdg-open
    R_BZIPCMD               /usr/bin/bzip2
    R_DOC_DIR               /usr/share/doc/R-3.3.2
    R_GZIPCMD               /usr/bin/gzip
    R_HOME                  /usr/lib64/R
    R_INCLUDE_DIR           /usr/include/R
    R_LIBS_SITE             /usr/local/lib/R/site-library:/usr/local/lib/R/library:/usr/lib64/R/library:/usr/share/R/library
    R_LIBS_USER             ~/R/x86_64-redhat-linux-gnu-library/3.3
    R_PAPERSIZE             a4
    R_PDFVIEWER             /usr/bin/xdg-open
    R_PLATFORM              x86_64-redhat-linux-gnu
    R_PRINTCMD              lpr
    R_RD4PDF                times,hyper
    R_SESSION_TMPDIR        /tmp/RtmpZf9YMN
    R_SHARE_DIR             /usr/share/R
    R_SYSTEM_ABI            linux,gcc,gxx,gfortran,?
    R_TEXI2DVICMD           /usr/bin/texi2dvi
    R_UNZIPCMD              /usr/bin/unzip
    R_ZIPCMD                
    RMARKDOWN_MATHJAX_PATH
    						/usr/lib/rstudio-server/resources/mathjax-26
    RS_RPOSTBACK_PATH       /usr/lib/rstudio-server/bin/rpostback
    RSTUDIO                 1
    RSTUDIO_HTTP_REFERER    http://hadoop-edge06.mycompany.com:8787/
    RSTUDIO_PANDOC          /usr/lib/rstudio-server/bin/pandoc
    RSTUDIO_SESSION_STREAM
    						wzhou-d
    RSTUDIO_USER_IDENTITY   wzhou
    RSTUDIO_WINUTILS        bin/winutils
    SED                     /bin/sed
    SPARK_HOME              /opt/cloudera/parcels/SPARK2/lib/spark2
    SSH_ASKPASS             rpostback-askpass
    TAR                     /bin/gtar
    USER                    wzhou
    YARN_CONF_DIR           /etc/hadoop/conf.cloudera.yarn
    

    Ahhh, I noticed the environment missed SPARK_DIST_CLASSPATH environment variable. Then I set it using the command below just before sc <- spark_connect.

    Sys.setenv(SPARK_DIST_CLASSPATH = '/etc/hadoop/con:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/./:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*')
    

    Ok, try it again. Fantastic, it works!

    Cause
    Ok, here is the real cause of the issue. It’s unnecessary to specify java path for sparklyr as it does not require a java path. However, it does have dependency on spark-submit. When spark-submit is executed, it can read java path and then submit the jar files to Spark accordingly. The cause of the issue if SPARK_DIST_CLASSPATH is not set, spark-submit is not working and Spark executors can not be launched.

    Other Note
    The following are some of useful commands:

    ls()
    spark_installed_versions()
    sessionInfo()
    spark_home_dir() or spark_home
    path.expand(“~”)
    Sys.getenv(“SPARK_HOME”)
    spark_home_dir()
    character(0)
    config <- spark_config()
    spark_install_dir()
    sc
    backend
    monitor
    output_file
    spark_context
    java_context
    hive_context

    master
    method
    app_name
    config
    config$sparklyr.cores.local
    config$spark.sql.shuffle.partitions.local
    config$spark.env.SPARK_LOCAL_IP.local
    config$sparklyr.csv.embedded
    config$`sparklyr.shell.driver-class-path`

    Also there are a few useful articles about sparklyr and Rstudio:
    RStudio’s R Interface to Spark on Amazon EMR
    How to Install RStudio Server on CentOS 7
    Using R with Apache Spark
    sparklyr: a test drive on YARN
    Analyzing a billion NYC taxi trips in Spark