Use Python for H2O

I wrote serveral blogs related to H2O topic as follows:
H2O vs Sparkling Water
Sparking Water Shell: Cloud size under 12 Exception
Access Sparkling Water via R Studio
Running H2O Cluster in Background and at Specific Port Number
Weird Ref-count mismatch Message from H2O

While both H2O Flow UI and R Studio can access H2O cluster started by Sparkling Water Shell, both lack the powerful functionality to manipulate data like Python does. In this blog, I am going to discuss how to use Python for H2O.

There are two main topics related to python for H2O. The first is to start a H2O cluster and another one is to access an existing H2O cluster using python.

1. Start H2O Cluster using pysparkling
I discussed start a H2O cluster using sparkling-shell in blog Sparking Water Shell: Cloud size under 12 Exception. For python, need to use pysparkling.

/opt/sparkling-water-2.2.2/bin/pysparkling \
--master yarn \
--conf spark.ext.h2o.cloud.name=WeidongH2O-Cluster \
--conf spark.ext.h2o.client.port.base=26000 \
--conf spark.executor.instances=6 \
--conf spark.executor.memory=12g \
--conf spark.driver.memory=8g \
--conf spark.yarn.executor.memoryOverhead=4g \
--conf spark.yarn.driver.memoryOverhead=4g \
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
--conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.locality.wait=30000 \
--conf spark.yarn.queue=HighPool \
--conf spark.scheduler.minRegisteredResourcesRatio=1

Then run the following python commands.

from pysparkling import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import h2o
hc = H2OContext.getOrCreate(sc)

The following is the sample output.

[wzhou@enkbda1node05 sparkling-water-2.2.2]$ /opt/sparkling-water-2.2.2/bin/pysparkling \
> --master yarn \
> --conf spark.ext.h2o.cloud.name=WeidongH2O-Cluster \
> --conf spark.ext.h2o.client.port.base=26000 \
> --conf spark.executor.instances=6 \
> --conf spark.executor.memory=12g \
> --conf spark.driver.memory=8g \
> --conf spark.yarn.executor.memoryOverhead=4g \
> --conf spark.yarn.driver.memoryOverhead=4g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.yarn.queue=HighPool \
> --conf spark.scheduler.minRegisteredResourcesRatio=1
Python 2.7.13 |Anaconda 4.4.0 (64-bit)| (default, Dec 20 2016, 23:09:15)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/anaconda2/lib/python2.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/10 15:00:20 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/12/10 15:00:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/10 15:00:21 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
17/12/10 15:00:24 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.
>>> from pysparkling import *
>>> from pyspark import SparkContext
>>> from pyspark.sql import SQLContext
>>> import h2o
>>> hc = H2OContext.getOrCreate(sc)
/opt/sparkling-water-2.2.2/py/build/dist/h2o_pysparkling_2.2-2.2.2.zip/pysparkling/context.py:111: UserWarning: Method H2OContext.getOrCreate with argument of type SparkContext is deprecated and parameter of type SparkSession is preferred.
17/12/10 15:01:45 WARN h2o.H2OContext: Method H2OContext.getOrCreate with an argument of type SparkContext is deprecated and parameter of type SparkSession is preferred.
Connecting to H2O server at http://192.168.10.41:26000... successful.
--------------------------  -------------------------------
H2O cluster uptime:         12 secs
H2O cluster version:        3.14.0.7
H2O cluster version age:    1 month and 21 days
H2O cluster name:           WeidongH2O-Cluster
H2O cluster total nodes:    6
H2O cluster free memory:    63.35 Gb
H2O cluster total cores:    192
H2O cluster allowed cores:  192
H2O cluster status:         accepting new members, healthy
H2O connection url:         http://192.168.10.41:26000
H2O connection proxy:
H2O internal security:      False
H2O API Extensions:         Algos, AutoML, Core V3, Core V4
Python version:             2.7.13 final
--------------------------  -------------------------------

Sparkling Water Context:
 * H2O name: WeidongH2O-Cluster
 * cluster size: 6
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (3,enkbda1node12.enkitec.com,26000)
  (1,enkbda1node13.enkitec.com,26000)
  (6,enkbda1node10.enkitec.com,26000)
  (5,enkbda1node09.enkitec.com,26000)
  (4,enkbda1node08.enkitec.com,26000)
  (2,enkbda1node11.enkitec.com,26000)
  ------------------------

  Open H2O Flow in browser: http://192.168.10.41:26000 (CMD + click in Mac OSX)


>>>
>>> h2o
<module 'h2o' from '/opt/sparkling-water-2.2.2/py/build/dist/h2o_pysparkling_2.2-2.2.2.zip/h2o/__init__.py'>
>>> h2o.cluster_status()
[WARNING] in <stdin> line 1:
    >>> ????
        ^^^^ Deprecated, use ``h2o.cluster().show_status(True)``.
--------------------------  -------------------------------
H2O cluster uptime:         14 secs
H2O cluster version:        3.14.0.7
H2O cluster version age:    1 month and 21 days
H2O cluster name:           WeidongH2O-Cluster
H2O cluster total nodes:    6
H2O cluster free memory:    63.35 Gb
H2O cluster total cores:    192
H2O cluster allowed cores:  192
H2O cluster status:         locked, healthy
H2O connection url:         http://192.168.10.41:26000
H2O connection proxy:
H2O internal security:      False
H2O API Extensions:         Algos, AutoML, Core V3, Core V4
Python version:             2.7.13 final
--------------------------  -------------------------------
Nodes info:     Node 1                                         Node 2                                         Node 3                                         Node 4                                         Node 5                                         Node 6
--------------  ---------------------------------------------  ---------------------------------------------  ---------------------------------------------  ---------------------------------------------  ---------------------------------------------  ---------------------------------------------
h2o             enkbda1node08.enkitec.com/192.168.10.44:26000  enkbda1node09.enkitec.com/192.168.10.45:26000  enkbda1node10.enkitec.com/192.168.10.46:26000  enkbda1node11.enkitec.com/192.168.10.47:26000  enkbda1node12.enkitec.com/192.168.10.48:26000  enkbda1node13.enkitec.com/192.168.10.49:26000
healthy         True                                           True                                           True                                           True                                           True                                           True
last_ping       1513108921427                                  1513108920821                                  1513108920602                                  1513108920876                                  1513108920718                                  1513108921149
num_cpus        32                                             32                                             32                                             32                                             32                                             32
sys_load        1.24                                           1.38                                           0.42                                           1.08                                           0.75                                           0.83
mem_value_size  0                                              0                                              0                                              0                                              0                                              0
free_mem        11339923456                                    11337239552                                    11339133952                                    11332368384                                    11331912704                                    11338491904
pojo_mem        113671168                                      116355072                                      114460672                                      121226240                                      121681920                                      115102720
swap_mem        0                                              0                                              0                                              0                                              0                                              0
free_disk       422764871680                                   389836439552                                   422211223552                                   418693251072                                   422237437952                                   422187106304
max_disk        491885953024                                   491885953024                                   491885953024                                   491885953024                                   491885953024                                   491885953024
pid             1172                                           801                                            15879                                          17866                                          28980                                          30818
num_keys        0                                              0                                              0                                              0                                              0                                              0
tcps_active     0                                              0                                              0                                              0                                              0                                              0
open_fds        441                                            441                                            441                                            441                                            441                                            441
rpcs_active     0                                              0                                              0                                              0                                              0                                              0

2. Accessing Existing H2O Cluster from a Edge Node
To access an existing H2O cluster from a edge node also requires to use pysparkling command. But you don’t have to specify a lot of parameters just like above step to start a H2O cluster. Running without any parameters is fine for the client purpose.

First start the pysparkling by running /opt/sparkling-water-2.2.2/bin/pysparkling command.

Once in the python prompt, run the following to connect to an existing cluster. The key is run h2o.init(ip=”enkbda1node05.enkitec.com”, port=26000). This command will connect to the existing H2O cluster.

from pysparkling import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import h2o

h2o.init(ip="enkbda1node05.enkitec.com", port=26000)
h2o.cluster_status()

Run some testing code there.

print("Importing hdfs data")
stock_data = h2o.import_file("hdfs://ENKBDA1-ns/user/wzhou/work/test/stock_price.txt")
stock_data

print("Spliting data")
train,test = stock_data.split_frame(ratios=[0.9])

h2o.ls()

The following is the sample output.

[wzhou@ham-lnx-vs-0086 ~]$ /opt/sparkling-water-2.2.2/bin/pysparkling
Python 2.7.13 |Anaconda 4.4.0 (64-bit)| (default, Dec 20 2016, 23:09:15)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.
>>> from pysparkling import *
>>> from pyspark import SparkContext
>>> from pyspark.sql import SQLContext
>>> import h2o
>>>
>>> h2o.init(ip="enkbda1node05.enkitec.com", port=26000)
Warning: connecting to remote server but falling back to local... Did you mean to use `h2o.connect()`?
Checking whether there is an H2O instance running at http://enkbda1node05.enkitec.com:26000. connected.
--------------------------  ---------------------------------------
H2O cluster uptime:         5 mins 43 secs
H2O cluster version:        3.14.0.7
H2O cluster version age:    1 month and 21 days
H2O cluster name:           WeidongH2O-Cluster
H2O cluster total nodes:    6
H2O cluster free memory:    58.10 Gb
H2O cluster total cores:    192
H2O cluster allowed cores:  192
H2O cluster status:         locked, healthy
H2O connection url:         http://enkbda1node05.enkitec.com:26000
H2O connection proxy:
H2O internal security:      False
H2O API Extensions:         Algos, AutoML, Core V3, Core V4
Python version:             2.7.13 final
--------------------------  ---------------------------------------
>>> h2o.cluster_status()
[WARNING] in <stdin> line 1:
    >>> ????
        ^^^^ Deprecated, use ``h2o.cluster().show_status(True)``.
--------------------------  ---------------------------------------
H2O cluster uptime:         5 mins 43 secs
H2O cluster version:        3.14.0.7
H2O cluster version age:    1 month and 21 days
H2O cluster name:           WeidongH2O-Cluster
H2O cluster total nodes:    6
H2O cluster free memory:    58.10 Gb
H2O cluster total cores:    192
H2O cluster allowed cores:  192
H2O cluster status:         locked, healthy
H2O connection url:         http://enkbda1node05.enkitec.com:26000
H2O connection proxy:
H2O internal security:      False
H2O API Extensions:         Algos, AutoML, Core V3, Core V4
Python version:             2.7.13 final
--------------------------  ---------------------------------------
Nodes info:     Node 1                                         Node 2                                         Node 3                                         Node 4                                         Node 5                                         Node 6
--------------  ---------------------------------------------  ---------------------------------------------  ---------------------------------------------  ---------------------------------------------  ---------------------------------------------  ---------------------------------------------
h2o             enkbda1node08.enkitec.com/192.168.10.44:26000  enkbda1node09.enkitec.com/192.168.10.45:26000  enkbda1node10.enkitec.com/192.168.10.46:26000  enkbda1node11.enkitec.com/192.168.10.47:26000  enkbda1node12.enkitec.com/192.168.10.48:26000  enkbda1node13.enkitec.com/192.168.10.49:26000
healthy         True                                           True                                           True                                           True                                           True                                           True
last_ping       1513109250525                                  1513109250725                                  1513109250400                                  1513109250218                                  1513109250709                                  1513109250536
num_cpus        32                                             32                                             32                                             32                                             32                                             32
sys_load        1.55                                           1.94                                           2.4                                            1.73                                           0.4                                            1.51
mem_value_size  0                                              0                                              0                                              0                                              0                                              0
free_mem        11339923456                                    10122031104                                    10076566528                                    10283636736                                    10280018944                                    10278896640
pojo_mem        113671168                                      1331563520                                     1377028096                                     1169957888                                     1173575680                                     1174697984
swap_mem        0                                              0                                              0                                              0                                              0                                              0
free_disk       422754385920                                   389839585280                                   422231146496                                   418692202496                                   422226952192                                   422176620544
max_disk        491885953024                                   491885953024                                   491885953024                                   491885953024                                   491885953024                                   491885953024
pid             1172                                           801                                            15879                                          17866                                          28980                                          30818
num_keys        0                                              0                                              0                                              0                                              0                                              0
tcps_active     0                                              0                                              0                                              0                                              0                                              0
open_fds        440                                            440                                            440                                            440                                            440                                            440
rpcs_active     0                                              0                                              0                                              0                                              0                                              0
>>> print("Importing hdfs data")
Importing hdfs data
>>> stock_data = h2o.import_file("hdfs://ENKBDA1-ns/user/wzhou/work/test/stock_price.txt")
Parse progress: | 100%

>>>
>>> stock_data
date                   close    volume    open    high     low
-------------------  -------  --------  ------  ------  ------
2016-09-23 00:00:00    24.05     56837   24.13  24.22   23.88
2016-09-22 00:00:00    24.1      56675   23.49  24.18   23.49
2016-09-21 00:00:00    23.38     70925   23.21  23.58   23.025
2016-09-20 00:00:00    23.07     35429   23.17  23.264  22.98
2016-09-19 00:00:00    23.12     34257   23.22  23.27   22.96
2016-09-16 00:00:00    23.16     83309   22.96  23.21   22.96
2016-09-15 00:00:00    23.01     43258   22.7   23.25   22.53
2016-09-14 00:00:00    22.69     33891   22.81  22.88   22.66
2016-09-13 00:00:00    22.81     59871   22.75  22.89   22.53
2016-09-12 00:00:00    22.85    109145   22.9   22.95   22.74

[65 rows x 6 columns]

>>> print("Spliting data")
Spliting data
>>> train,test = stock_data.split_frame(ratios=[0.9])
>>> h2o.ls()
                      key
0  py_1_sid_83d3_splitter
1         stock_price.hex

Let’s go to R Studio. We should see this H2O frame.

Let’s check it out from H2O Flow UI.

Cool, it’s there as well and we’re good here.

Advertisements

Weird Ref-count mismatch Message from H2O

H2O is a nice fast tools for data science work. I have discussed this topic in the following blogs:
H2O vs Sparkling Water
Sparking Water Shell: Cloud size under 12 Exception
Access Sparkling Water via R Studio
Running H2O Cluster in Background and at Specific Port Number
Recently we run into some weird issue when using H2O when using R Studio. For unknown reasons, it throws the following error messages:

score <- as.data.frame(main[,c('id', 'my_test_score')])

ERROR: Unexpected HTTP Status code: 500 Server Error (url = http://enkbda1node05.enkitec.com:26000/99/Rapids)

java.lang.IllegalStateException
 [1] "java.lang.IllegalStateException: Ref-count mismatch for vec $04ffc52a0000ffffffff$hdfs://ENKBDA1-ns/user/wzhou/test/data/test1/part-00000-8516f9f8-3d5f-164b-cf7e-18ca7e8d467f-d000.snappy.parquet: REFCNT = 2, should be 1"
 [2] "    water.rapids.Session.sanity_check_refs(Session.java:341)"                                                                                                                                                                                                     
 [3] "    water.rapids.Session.exec(Session.java:83)"                                                                                                                                                                                                                   
 [4] "    water.rapids.Rapids.exec(Rapids.java:93)"                                                                                                                                                                                                                     
 [5] "    water.api.RapidsHandler.exec(RapidsHandler.java:38)"                                                                                                                                                                                                          
 [6] "    sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)"                                                                                                                                                                                                 
 [7] "    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)"                                                                                                                                                                        
 [8] "    java.lang.reflect.Method.invoke(Method.java:498)"                                                                                                                                                                                                             
 [9] "    water.api.Handler.handle(Handler.java:63)"                                                                                                                                                                                                                    
[10] "    water.api.RequestServer.serve(RequestServer.java:448)"                                                                                                                                                                                                        
[11] "    water.api.RequestServer.doGeneric(RequestServer.java:297)"                                                                                                                                                                                                    
[12] "    water.api.RequestServer.doPost(RequestServer.java:223)"                                                                                                                                                                                                       
[13] "    javax.servlet.http.HttpServlet.service(HttpServlet.java:707)"                                                                                                                                                                                                 
[14] "    javax.servlet.http.HttpServlet.service(HttpServlet.java:790)"                                                                                                                                                                                                 
[15] "    ai.h2o.org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)"                                                                                                                                                                                
[16] "    ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:503)"                                                                                                                                                                            
[17] "    ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)"                                                                                                                                                                    
[18] "    ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:429)"                                                                                                                                                                             
[19] "    ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)"                                                                                                                                                                     
[20] "    ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)"                                                                                                                                                                         
[21] "    ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:154)"                                                                                                                                                                 
[22] "    ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)"                                                                                                                                                                       
[23] "    water.JettyHTTPD$LoginHandler.handle(JettyHTTPD.java:189)"                                                                                                                                                                                                    
[24] "    ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:154)"                                                                                                                                                                 
[25] "    ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)"                                                                                                                                                                       
[26] "    ai.h2o.org.eclipse.jetty.server.Server.handle(Server.java:370)"                                                                                                                                                                                               
[27] "    ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)"                                                                                                                                                        
[28] "    ai.h2o.org.eclipse.jetty.server.BlockingHttpConnection.handleRequest(BlockingHttpConnection.java:53)"                                                                                                                                                         
[29] "    ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection.content(AbstractHttpConnection.java:982)"                                                                                                                                                              
[30] "    ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.content(AbstractHttpConnection.java:1043)"                                                                                                                                              
[31] "    ai.h2o.org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:865)"                                                                                                                                                                                      
[32] "    ai.h2o.org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:240)"                                                                                                                                                                                 
[33] "    ai.h2o.org.eclipse.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)"                                                                                                                                                                
[34] "    ai.h2o.org.eclipse.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)"                                                                                                                                                          
[35] "    ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)"                                                                                                                                                                      
[36] "    ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)"                                                                                                                                                                       
[37] "    java.lang.Thread.run(Thread.java:745)"                                                                                                                                                                                                                        

Error in .h2o.doSafeREST(h2oRestApiVersion = h2oRestApiVersion, urlSuffix = page,  : 
  
ERROR MESSAGE:

Ref-count mismatch for vec $04ffc52a0000ffffffff$hdfs://ENKBDA1-ns/user/wzhou/test/data/test1/part-00000-8516f9f8-3d5f-164b-cf7e-18ca7e8d467f-d000.snappy.parquet: REFCNT = 2, should be 1

Tried different ways and still got the same error. Had to bounce the H2O cluster and get rid of this error. Then further investigation found out the steps we can reproduce this issue. If we delete some or all H2O frames from H2O UI, we could run into this issue.

Basically, just run getFrames command, then select the frames want to be deleted, click Delete selected frames at the bottom. Run getFrames again. It should show those frames deleted. But if I go to R Studio, run h2o.ls(), it will show the exact Ref-count mismatch error. It seems like the frames were deleted from H2O UI perspective, but not from R Studio’s perspective.

Ok, we found out the cause. How to resolve it? Check out H2O source code at https://github.com/h2oai/h2o-3/blob/master/h2o-core/src/main/java/water/rapids/Session.java.

we could see that this error is thrown during sanity_check_refs function. The function also has the following interesting description

Check that ref counts are in a consistent state.
* This should only be called between calls to Rapids expressions (otherwise may blow false-positives).

It seems there is something not handled well in other part of the code. If inconsistent in the object references, some other part of the code might fail. It looks bugs to me.

After some investigation, found out h2o.removeAll() running from R Studio should fix this issue.

Create Cloudera Hadoop Cluster Using Cloudera Director on Google Cloud

I have a blog discussing how to install Cloudera Hadoop Cluster several years ago. It basically took about at least half day to complete the installation in my VM cluster. In my last post, I discussed an approach to deploy Hadoop cluster using DataProc on Google Cloud Platform. It literally took less than two minutes to create a Hadoop Cluster. Although it is a good to have a cluster launched in a very short time, it does not have the nice UI like Cloudera Manager as the Hadoop distribution used by Dataproc is not CDH. I could repeat my blogs to build a Hadoop Cluster using VM instances on Google Cloud Platform. But it will take some time and involve a lot of work. Actually there is another way to create Hadoop cluster on the cloud. Cloudera has a product, called Cloudera Director. It currently supports not only Google Cloud, but also AWS and Azure as well. It is designed to deploy CDH cluster faster and easier to scale the cluster on the cloud. Another important feature is that Cloud Director allows you to move your deployment scripts or steps easily from one cloud provider to another provider and you don’t have to be locked in one cloud vendor. In this blog, I will show you the way to create a CDH cluster using Cloudera Director.

The first step is to start my Cloudera Director instance. In my case, I have already installed Cloudera Director based on the instruction from Cloudera. It is pretty straight forward process and I am not going to repeat it here. The Cloudera Director instance is where you can launch your CDH cluster deployment.

Both Cloudera Director and Cloudera Manager UI are browser-based and you have to setup secure connection between your local machine and VM instances on the cloud. To achieve this, you need to configure SOCKS proxy on your local machine that is used to connect to the Cloudera Director VM. It provides a secure way to connect to your VM on the cloud and can use VM’s internal IP and hostname in the web browser. Google has a nice note about the steps, Securely Connecting to VM Instances. Following this note will help you to setup SOCKS proxy.

Ok, here are the steps.
Logon to Cloudera Director
Open a terminal session locally, and run the following code:

gcloud compute ssh cdh-director-1 \
    --project cdh-director-173715 \
    --zone us-central1-c \
    --ssh-flag="-D" \
    --ssh-flag="1080" \
    --ssh-flag="-N"    

cdh-director-1 is the name of my Cloudera Director instance on Google cloud and cdh-director-173715 is my Google Cloud project id. After executing the above command, it looks hang and never complete. This is CORRECT behavior. Do not kill or exit this session. Open a browser and type in the internal IP of Cloudera Director instance with port number 7189. For my cdh-director-1 instance, the internal IP is 10.128.0.2.

After input the URL http://10.128.0.2:7189 for Cloudera Director. The login screen shows up. Login as admin user.

Deployment
After login, the initial setup wizard shows up. Click Let’s get started.

In the Add Environment screen, input the information as follows. The Client ID JSON Key is the file you can create during the initial setup of you Google project with SSH key stuff.

In the next Add Cloudera Manager screen, I usually create the Instance Template first. Click the drop down of Instance Template, then select Create a new instance template. I need at least three template, one for Cloudera Manager, one for Master nodes, and one for Worker nodes. In my case here, I did not create a template for Edge nodes. To save resource on my Google cloud environment, I did not create the template for Edge node. Here are the configuration for all three templates.

Cloudera Manager Template

Master Node Template

Worker Node Template

Input the following for Cloudera Manager. For my test, I use Embedded Database. If it is used for production, you need to setup external database first and register the external database here.

After click Continue, Add Cluster screen shows up. There is a gateway instance group and I removed it by clicking Delete Group because I don’t have edge node here. Input the corresponding template and number of instances for masters and workders.

After click Continue, the deployment starts.

After about 20 minutes, it completes. Click Continue.

Review Cluster
The nice Cloudera Director dashboard shows up.

You can also login to Cloudera Manager from the link on Cloudera Director.

Nice and easy. Excellent product from Cloudera. For more information about deploying CDH cluster on Google Cloud, you can also check out Cloudera’s document, Getting Started on Google Cloud Platform.

Resolve Sparklyr not Respond Issue on Port 8880

Recently I was approached by one of my clients to help them to investigate a weird Sparklyr issue. sparklyr is an interface between R and Spark introduced by RStudio about a years ago. The following is the the sparklyr architecture.

When trying to do sc <- spark_connect in RStudio, we got two errors as follows:

  • Failed while connecting to sparklyr to port (8880) for sessionid (3859): Gateway in port (8880) did not respond.
  • Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
  • Here is the detail message.

    > library(sparklyr)
    > library(dplyr) 
    > sc <- spark_connect(master = "yarn-client", config=spark_config(), version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
     
    Error in force(code) :
    Failed while connecting to sparklyr to port (8880) for sessionid (3859): Gateway in port (8880) did not respond.
    Path: /opt/cloudera/parcels/CDH-5.10.1-1.cdh5.10.1.p0.10/lib/spark/bin/spark-submit
    Parameters: --class, sparklyr.Shell, --jars, '/usr/lib64/R/library/sparklyr/java/spark-csv_2.11-1.3.0.jar','/usr/lib64/R/library/sparklyr/java/commons-csv-1.1.jar','/usr/lib64/R/library/sparklyr/java/univocity-parsers-1.5.1.jar', '/usr/lib64/R/library/sparklyr/java/sparklyr-1.6-2.10.jar', 8880, 3859
    Log: /tmp/RtmzpSIMln/file9e23246605df7_spark.log
     
    ....
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
                   at org.apache.spark.deploy.SparkSubmitArguments.handle(SparkSubmitArguments.scala:394)
                   at org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:163)
                   at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:97)
                   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:114)
                   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream
                   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
                   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
                   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                   ... 5 more
    

    Did some research and found many people having the similar issue. Ok, try their recommendations one by one as follows.

  • Set SPARK_HOME environment
  • Try run Sys.setEnv(SPARK_HOME = “/opt/cloudera/parcels/CDH/lib/spark/”). No, not working.

  • Install latest version sparklyr
  • My client installed sparklyr less than one month ago. I don’t see why this option makes sense. Don’t even pursue this path.

  • Check Java Installation
  • The R on the same server uses the same version of Java without any issue. I don’t see why Java installation become a major concern here. Ignore this one.

  • No Hadoop Installation
  • Someone said just Spark installation is not enough, not to have Hadoop Installation as well. Clearly it does not fit our situation. The server is an edge node and has hadoop installation.

  • Do not have a valid kerberos ticket
  • Running system2(‘klist’) does show no kerberos ticket. Ok, I then open up a shell within RStudio Server by clicking tools -> shell, then issuing the kinit command.
    Rerun system2(‘klist’) shows I have a valid kerberos ticket. Try again. still not working.
    Note: even it is not working, this step is necessary for further action when the issue is fixed. So still need to run this one no matter what the result is.

  • Create a different configure and pass to spark_connect
  • Someone recommended to create a new configure and pass it in. It looks like a good idea. Unfortunately, just doesn’t work.

    wzconfig <- spark_config()
    wzconfig$`sparklyr.shell.deploy-mode` <- "client"
    wzconfig$spark.driver.cores <- 1
    wzconfig$spark.executor.cores <- 2
    wzconfig$spark.executor.memory <- "4G"
    sc <- spark_connect(master = "yarn-client", config=wzconfig, version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
    

    Actually this recommendation is missing another key parameter. By default the total number of executors launched is 2. I would usually bump up this number a little to get a better performance. You can use the following way to set up the
    total number of executors.

    wzconfig$spark.executor.instances <- 3
    

    Although this approach looks promising, still not working. But this approach is definitely a way to use for other purpose to better control the Spark resource usage.

  • Add remote address
  • Someone mentioned to set remote address. I thought this could another potential option as I resolved issues in Spark related to local IP issue in the past. So I add the following code in the configuration from the previous example, note parameter sparklyr.gateway.address is the hostname of active Resource Manager.

    wzconfig$sparklyr.gateway.remote <- TRUE
    wzconfig$sparklyr.gateway.address <- "cdhcluster01n03.mycompany.com" 
    

    Not working for this case.

  • Change deployment mode to yarn-cluster
  • This is probably the most unrealistic one. If connect as with master = “yarn-cluster”, the spark driver will be somewhere inside the Spark cluster. For our current case, I don’t believe this is the right solution. Don’t even try it.

  • Run Spark example
  • Someone recommended to run a spark-submit to verify SparkPi can be run from the environment. This looks reasonable. The good thing I figured out the issue before executing this one. But this definitely a valid and good test to verify spark-submit.

    /opt/cloudera/parcels/SPARK2/lib/spark2/bin/spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn /opt/cloudera/parcels/SPARK2/lib/spark2/examples/jars/spark-examples_2.11-2.1.0.jar 10
    
  • HA for yarn-cluster
  • There is an interesting post Add support for `yarn-cluster` with high availability #905 discussing about the issue might relate to multiple resource managers. We use HA and this post is an interesting one. But might not fit into our case because I feel we have not reached to the HA part yet with Class Not Found message.

  • Need to set JAVA_HOME
  • Verified it and we have it. So this is not the issue.

  • My Solution
  • After reviewing or trying out some of above solutions, I like to go back my way of thinking. I must say I am not an expert in R or RStudio with very limited knowledge about how it works. But I did have extensive background in Spark tuning and trouble shooting.

    I know the error message Gateway in port (8880) did not respond is always the first message shows up and looks like the cause of the issue. But I thought differently. I believe the 2nd error NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream looks more suspicious than the first one. Early this year I helped one of another clients on a weird Spark job issue, which is in the end, was caused by the incorrect path. It seems to me the path might not be right and cause Spark issue, then caused the first error of port not respond.

    With this idea in mind, I focused more the path verification. Run the command Sys.getenv() to get the environment as follows.

    > Sys.getenv()
    DISPLAY                 :0
    EDITOR                  vi
    GIT_ASKPASS             rpostback-askpass
    HADOOP_CONF_DIR         /etc/hadoop/conf.cloudera.hdfs
    HADOOP_HOME             /opt/cloudera/parcels/CDH
    HOME                    /home/wzhou
    JAVA_HOME               /usr/java/jdk1.8.0_144/jre
    LANG                    en_US.UTF-8
    LD_LIBRARY_PATH         /usr/lib64/R/lib::/lib:/usr/java/jdk1.8.0_92/jre/lib/amd64/server
    LN_S                    ln -s
    LOGNAME                 wzhou
    MAKE                    make
    PAGER                   /usr/bin/less
    PATH                    /usr/local/sbin:/usr/local/bin:/usr/bin:/usr/sbin:/sbin:/bin
    R_BROWSER               /usr/bin/xdg-open
    R_BZIPCMD               /usr/bin/bzip2
    R_DOC_DIR               /usr/share/doc/R-3.3.2
    R_GZIPCMD               /usr/bin/gzip
    R_HOME                  /usr/lib64/R
    R_INCLUDE_DIR           /usr/include/R
    R_LIBS_SITE             /usr/local/lib/R/site-library:/usr/local/lib/R/library:/usr/lib64/R/library:/usr/share/R/library
    R_LIBS_USER             ~/R/x86_64-redhat-linux-gnu-library/3.3
    R_PAPERSIZE             a4
    R_PDFVIEWER             /usr/bin/xdg-open
    R_PLATFORM              x86_64-redhat-linux-gnu
    R_PRINTCMD              lpr
    R_RD4PDF                times,hyper
    R_SESSION_TMPDIR        /tmp/RtmpZf9YMN
    R_SHARE_DIR             /usr/share/R
    R_SYSTEM_ABI            linux,gcc,gxx,gfortran,?
    R_TEXI2DVICMD           /usr/bin/texi2dvi
    R_UNZIPCMD              /usr/bin/unzip
    R_ZIPCMD                
    RMARKDOWN_MATHJAX_PATH
    						/usr/lib/rstudio-server/resources/mathjax-26
    RS_RPOSTBACK_PATH       /usr/lib/rstudio-server/bin/rpostback
    RSTUDIO                 1
    RSTUDIO_HTTP_REFERER    http://hadoop-edge06.mycompany.com:8787/
    RSTUDIO_PANDOC          /usr/lib/rstudio-server/bin/pandoc
    RSTUDIO_SESSION_STREAM
    						wzhou-d
    RSTUDIO_USER_IDENTITY   wzhou
    RSTUDIO_WINUTILS        bin/winutils
    SED                     /bin/sed
    SPARK_HOME              /opt/cloudera/parcels/SPARK2/lib/spark2
    SSH_ASKPASS             rpostback-askpass
    TAR                     /bin/gtar
    USER                    wzhou
    YARN_CONF_DIR           /etc/hadoop/conf.cloudera.yarn
    

    Ahhh, I noticed the environment missed SPARK_DIST_CLASSPATH environment variable. Then I set it using the command below just before sc <- spark_connect.

    Sys.setenv(SPARK_DIST_CLASSPATH = '/etc/hadoop/con:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/./:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*')
    

    Ok, try it again. Fantastic, it works!

    Cause
    Ok, here is the real cause of the issue. It’s unnecessary to specify java path for sparklyr as it does not require a java path. However, it does have dependency on spark-submit. When spark-submit is executed, it can read java path and then submit the jar files to Spark accordingly. The cause of the issue if SPARK_DIST_CLASSPATH is not set, spark-submit is not working and Spark executors can not be launched.

    Other Note
    The following are some of useful commands:

    ls()
    spark_installed_versions()
    sessionInfo()
    spark_home_dir() or spark_home
    path.expand(“~”)
    Sys.getenv(“SPARK_HOME”)
    spark_home_dir()
    character(0)
    config <- spark_config()
    spark_install_dir()
    sc
    backend
    monitor
    output_file
    spark_context
    java_context
    hive_context

    master
    method
    app_name
    config
    config$sparklyr.cores.local
    config$spark.sql.shuffle.partitions.local
    config$spark.env.SPARK_LOCAL_IP.local
    config$sparklyr.csv.embedded
    config$`sparklyr.shell.driver-class-path`

    Also there are a few useful articles about sparklyr and Rstudio:
    RStudio’s R Interface to Spark on Amazon EMR
    How to Install RStudio Server on CentOS 7
    Using R with Apache Spark
    sparklyr: a test drive on YARN
    Analyzing a billion NYC taxi trips in Spark

    Create Hadoop Cluster on Google Cloud Platform

    There are many ways to create Hadoop clusters and I am going to show a few ways on Google Cloud Platform (GCP). The first approach is the standard way to build a Hadoop cluster, no matter whether you do it on cloud or on-premise. Basically create a group of VM instances and manually install Hadoop cluster on these VM instances. Many people have blogs or articles about this approach and I am not going to repeat the steps here.

    In this blog, I am going to discuss the approach using Google Cloud Dataproc and you can actually have a Hadoop cluster up and running
    within 2 minutes. Google Cloud Dataproc is a fully-managed cloud service for running Apache Hadoop cluster in a simple and fast way. The followings show the steps to create a Hadoop Cluster and submit a spark job to the cluster.

    Create a Hadoop Cluster
    Click Dataproc -> Clusters

    Then click Enable API

    Cloud Dataproc screen shows up. Click Create cluster

    Input the following parameters:
    Name : cluster-test1
    Region : Choose use-central1
    Zone : Choose us-central1-c

    1. Master Node
    Machine Type : The default is n1-standard-4, but I choose n1-standard-1 just for simple testing purpose.
    Cluster Mode : There are 3 modes here. Single Mode (1 master, 0 worker), Standard Mode (1 master, N worker), and High Mode (3 masters, N workers). Choose Standard Mode.
    Primary disk size : For my testing, 10GB 1s enough.

    2. Worker Nodes
    Similar configuration like Worker node. I use 3 worker nodes and disk size is 15 GB. You might notice that there is option to use local SSD storage. You can attach up to 8 local SSD devices to the VM instance. Each disk is 375 GB in size and you can not specify 10GB disk size here. The local SSDs are physically attached to the host server and offer higher performance and lower latency storage than Google’s persistent disk storage. The local SSDs is used for temporary data like shuffling data in MapReduce. The data on the local SSD storage is not persistent. For more information, please visit https://cloud.google.com/compute/docs/disks/local-ssd.

    Another thing to mention is that Dataproc uses Cloud Storage bucket instead of HDFS for the Hadoop cluster. Although the hadoop command is still working and you won’t feel anything different, the underline storage is different. In my opinion, it is actually better because Google Cloud Storage bucket definitely has much better reliability and scalability than HDFS.

    Click Create when everything is done. After a few minutes, the cluster is created.

    Click cluster-test1 and it should show the cluster information.

    If click VM Instances tab, we can see there is one master and 3 worker instances.

    Click Configuration tab. It shows all configuration information.

    Submit a Spark Job
    Click Cloud Dataproc -> Jobs.

    Once Submit Job screen shows up, input the following information, then click Submit.

    After the job completes, you should see the followings:

    To verify the result, I need to ssh to the master node to find out which ports are listening for connections. Click the drop down on the right of SSH of master node, then click Open in browser window.

    Then run the netstat command.

    cluster-test1-m:~$ netstat -a |grep LISTEN |grep tcp
    tcp        0      0 *:10033                 *:*                     LISTEN     
    tcp        0      0 *:10002                 *:*                     LISTEN     
    tcp        0      0 cluster-test1-m.c.:8020 *:*                     LISTEN     
    tcp        0      0 *:33044                 *:*                     LISTEN     
    tcp        0      0 *:ssh                   *:*                     LISTEN     
    tcp        0      0 *:52888                 *:*                     LISTEN     
    tcp        0      0 *:58266                 *:*                     LISTEN     
    tcp        0      0 *:35738                 *:*                     LISTEN     
    tcp        0      0 *:9083                  *:*                     LISTEN     
    tcp        0      0 *:34238                 *:*                     LISTEN     
    tcp        0      0 *:nfs                   *:*                     LISTEN     
    tcp        0      0 cluster-test1-m.c:10020 *:*                     LISTEN     
    tcp        0      0 localhost:mysql         *:*                     LISTEN     
    tcp        0      0 *:9868                  *:*                     LISTEN     
    tcp        0      0 *:9870                  *:*                     LISTEN     
    tcp        0      0 *:sunrpc                *:*                     LISTEN     
    tcp        0      0 *:webmin                *:*                     LISTEN     
    tcp        0      0 cluster-test1-m.c:19888 *:*                     LISTEN     
    tcp6       0      0 [::]:10001              [::]:*                  LISTEN     
    tcp6       0      0 [::]:44884              [::]:*                  LISTEN     
    tcp6       0      0 [::]:50965              [::]:*                  LISTEN     
    tcp6       0      0 [::]:ssh                [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m:omniorb [::]:*                  LISTEN     
    tcp6       0      0 [::]:46745              [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m.c.:8030 [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m.c.:8031 [::]:*                  LISTEN     
    tcp6       0      0 [::]:18080              [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m.c.:8032 [::]:*                  LISTEN     
    tcp6       0      0 cluster-test1-m.c.:8033 [::]:*                  LISTEN     
    tcp6       0      0 [::]:nfs                [::]:*                  LISTEN     
    tcp6       0      0 [::]:33615              [::]:*                  LISTEN     
    tcp6       0      0 [::]:56911              [::]:*                  LISTEN     
    tcp6       0      0 [::]:sunrpc             [::]:*                  LISTEN  
    

    Check out directories.

    cluster-test1-m:~$ hdfs dfs -ls /
    17/09/12 12:12:24 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.1-hadoop2
    Found 2 items
    drwxrwxrwt   - mapred hadoop          0 2017-09-12 11:56 /tmp
    drwxrwxrwt   - hdfs   hadoop          0 2017-09-12 11:55 /user
    

    There are a few UI screens available to check out the Hadoop cluster and job status.
    HDFS NameNode (port 9870)

    YARN Resource Manager (port 8088)

    Spark Job History (port 18080)

    Dataproc approach is an easy deployment tool to create a Hadoop cluster. Although it is powerful, I miss the nice UI like Cloudera Manager. To install Cloudera CDH cluster, I need to use a different approach and I am going to discuss it in the future blog.