Parquet File Can not Be Read in Sparkling Water H2O

For the past few months, I wrote several blogs related to H2O topic:
Use Python for H2O
H2O vs Sparkling Water
Sparking Water Shell: Cloud size under 12 Exception
Access Sparkling Water via R Studio
Running H2O Cluster in Background and at Specific Port Number
Weird Ref-count mismatch Message from H2O

Sparkling Water and H2O are very good in terms of performance for data science projects. But if something works beautifully in one environment, not working in another environment could be the most annoyed thing in the world. This is exactly what had happened at one of my clients.

They used to use Sparkling Water and H2O in Oracle BDA environment and worked great. However, even with a full rack BDA (18 nodes), it is still not enough to run big dataset on H2O. Recently they moved to a much bigger CDH cluster (non-BDA environment) with CDH 5.13 installed. Sparkling Water is still working, however there was one major issue: parquet files can not be read correctly. There are no issue in reading the same parquet files from Spark shell and pyspark. This is really an annoying issue as parquet format is one of data formats that are heavily used by the client. After many failed tries and investigation, I was finally able to figure out the issue and implement a workaround solution. This blog discussed this parquet reading issue and workaround solution in Sparkling Water.

Create Test Data Set
I did the test in my CDH cluster (CDH 5.13). I first created a small test data set, stock.csv, and uploaded to /user/root directory on HDFS.

date,close,volume,open,high,low
9/23/16,24.05,56837,24.13,24.22,23.88
9/22/16,24.1,56675,23.49,24.18,23.49
9/21/16,23.38,70925,23.21,23.58,23.025
9/20/16,23.07,35429,23.17,23.264,22.98
9/19/16,23.12,34257,23.22,23.27,22.96
9/16/16,23.16,83309,22.96,23.21,22.96
9/15/16,23.01,43258,22.7,23.25,22.53
9/14/16,22.69,33891,22.81,22.88,22.66
9/13/16,22.81,59871,22.75,22.89,22.53
9/12/16,22.85,109145,22.9,22.95,22.74
9/9/16,23.03,115901,23.53,23.53,23.02
9/8/16,23.6,32717,23.8,23.83,23.55
9/7/16,23.85,143635,23.69,23.89,23.69
9/6/16,23.68,43577,23.78,23.79,23.43
9/2/16,23.84,31333,23.45,23.93,23.41
9/1/16,23.42,49547,23.45,23.48,23.26

Create a Parquet File
Run the following in spark2-shell to create a parquet file and make sure that I can read it back.

scala> val myTest=spark.read.format("csv").load("/user/root/stock.csv")
myTest: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 4 more fields]
 
scala> myTest.show(3)
+-------+-----+------+-----+-----+-----+
|    _c0|  _c1|   _c2|  _c3|  _c4|  _c5|
+-------+-----+------+-----+-----+-----+
|   date|close|volume| open| high|  low|
|9/23/16|24.05| 56837|24.13|24.22|23.88|
|9/22/16| 24.1| 56675|23.49|24.18|23.49|
+-------+-----+------+-----+-----+-----+
only showing top 3 rows
 
scala> myTest.write.format("parquet").save("/user/root/mytest.parquet")
 
scala> val readTest = spark.read.format("parquet").load("/user/root/mytest.parquet")
readTest: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 4 more fields]
 
scala> readTest.show(3)
+-------+-----+------+-----+-----+-----+
|    _c0|  _c1|   _c2|  _c3|  _c4|  _c5|
+-------+-----+------+-----+-----+-----+
|   date|close|volume| open| high|  low|
|9/23/16|24.05| 56837|24.13|24.22|23.88|
|9/22/16| 24.1| 56675|23.49|24.18|23.49|
+-------+-----+------+-----+-----+-----+
only showing top 3 rows

Start a Sparkling Water H2O Cluster
I started a Sparking Water Cluster with 2 nodes.

[root@a84-master--2df67700-f9d1-46f3-afcf-ba27a523e143 sparkling-water-2.2.7]# . /etc/spark2/conf.cloudera.spark2_on_yarn/spark-env.sh
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
[root@a84-master--2df67700-f9d1-46f3-afcf-ba27a523e143 sparkling-water-2.2.7]# bin/sparkling-shell \
> --master yarn \
> --conf spark.executor.instances=2 \
> --conf spark.executor.memory=1g \
> --conf spark.driver.memory=1g \
> --conf spark.scheduler.maxRegisteredResourcesWaitingTime=1000000 \
> --conf spark.ext.h2o.fail.on.unsupported.spark.param=false \
> --conf spark.dynamicAllocation.enabled=false \
> --conf spark.sql.autoBroadcastJoinThreshold=-1 \
> --conf spark.locality.wait=30000 \
> --conf spark.scheduler.minRegisteredResourcesRatio=1

-----
  Spark master (MASTER)     : yarn
  Spark home   (SPARK_HOME) : /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
  H2O build version         : 3.16.0.4 (wheeler)
  Spark build version       : 2.2.1
  Scala version             : 2.11
----

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.128.0.5:4040
Spark context available as 'sc' (master = yarn, app id = application_1518097883047_0001).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.cloudera1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._

scala> val h2oContext = H2OContext.getOrCreate(spark) 
h2oContext: org.apache.spark.h2o.H2OContext =                                   

Sparkling Water Context:
 * H2O name: sparkling-water-root_application_1518097883047_0001
 * cluster size: 2
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (1,a84-worker--6e693a0a-d92c-4172-81b7-f2f07b6d5d7c.c.cdh-director-194318.internal,54321)
  (2,a84-worker--c0ecccae-cead-44f2-9f75-39aadb1d024a.c.cdh-director-194318.internal,54321)
  ------------------------

  Open H2O Flow in browser: http://10.128.0.5:54321 (CMD + click in Mac OSX)

scala> import h2oContext._
import h2oContext._
scala> 

Read Parquet File from H2O Flow UI
Open H2O Flow UI and read the same parquet file.

After click Parse these files, got corrupted file.

Obviously, parquet file was not read correctly. At this moment, there are no error messages in the H2O console. If continue to import the file, the H2O Flow UI throw the following error

The H2O console would show the following error:

scala> 18/02/08 09:23:59 WARN servlet.ServletHandler: Error for /3/Parse
java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/api/ReadSupport
	at water.parser.parquet.ParquetParser.correctTypeConversions(ParquetParser.java:104)
	at water.parser.parquet.ParquetParserProvider.createParserSetup(ParquetParserProvider.java:48)
	at water.parser.ParseSetup.getFinalSetup(ParseSetup.java:213)
	at water.parser.ParseDataset.forkParseDataset(ParseDataset.java:119)
	at water.parser.ParseDataset.parse(ParseDataset.java:43)
	at water.api.ParseHandler.parse(ParseHandler.java:36)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at water.api.Handler.handle(Handler.java:63)
	at water.api.RequestServer.serve(RequestServer.java:451)
	at water.api.RequestServer.doGeneric(RequestServer.java:296)
	at water.api.RequestServer.doPost(RequestServer.java:222)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
	at ai.h2o.org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
	at ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:503)
	at ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
	at ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:429)
	at ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
	at ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
	at ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:154)
	at ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
	at water.JettyHTTPD$LoginHandler.handle(JettyHTTPD.java:192)
	at ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:154)
	at ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
	at ai.h2o.org.eclipse.jetty.server.Server.handle(Server.java:370)
	at ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
	at ai.h2o.org.eclipse.jetty.server.BlockingHttpConnection.handleRequest(BlockingHttpConnection.java:53)
	at ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection.content(AbstractHttpConnection.java:982)
	at ai.h2o.org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.content(AbstractHttpConnection.java:1043)
	at ai.h2o.org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:865)
	at ai.h2o.org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:240)
	at ai.h2o.org.eclipse.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
	at ai.h2o.org.eclipse.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
	at ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
	at ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.hadoop.api.ReadSupport
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 39 more

Solution
As BDA has no issue in the same Sparkling Water H2O deployment and BDA used CDH 5.10, I initially focused more on CDH version difference. I built three CDH clusters using three different CDH versions: 5.13, 5.12 and 5.10. All of them show the exact same error. This made me rule out the possibility from CDH version difference and shifted focus on the environment difference, especially class path and jar files. Tried setting JAVA_HOME, SPARK_HOME, SPARK_DIST_CLASSPATH and unfortunately none of them worked.

I noticed /etc/spark2/conf.cloudera.spark2_on_yarn/classpath.txt seem have much less entries than classpath.txt under spark 1.6. Tried adding back the missing entries. Still no luck.

Added two more parameters to get more information about H2O log.

--conf spark.ext.h2o.node.log.level=INFO \
--conf spark.ext.h2o.client.log.level=INFO \

It gave a little more useful information. It complained about class ParquetFileWriter not found.

$ cat h2o_10.54.225.9_54000-5-error.log
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR: DistributedException from /192.168.10.54:54005: 'org/apache/parquet/hadoop/ParquetFileWriter', caused by java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetFileWriter
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.MRTask.getResult(MRTask.java:478)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.MRTask.getResult(MRTask.java:486)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.MRTask.doAll(MRTask.java:402)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.parser.ParseSetup.guessSetup(ParseSetup.java:283)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at water.api.ParseSetupHandler.guessSetup(ParseSetupHandler.java:40)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01-17 04:55:47.406 10.54.225.9:54000     18567  #6115-112 ERRR:         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

My client found a temporary solution by using h2odriver.jar following the instruction from Using H2O on Hadoop. The command used is shown below:

cd /opt/h2o-3
hadoop jar h2odriver.jar -nodes 70 -mapperXmx 40g -output hdfs://PROD01ns/user/mytestuser/log24

Although this solution provides similar functionalities in Sparkling Water, it has some critical performance issues:
1. The above command would create 70 nodes H2O cluster. If using Sparkling Water, it would be evenly distribute to all available nodes. But the above h2odriver.jarapproach would heavily use a few hadoop nodes. For big dataset, majority of activities happened only to 3~4 nodes, which made those nodes’ cpu utilization close to 100%. For one test big dataset, it has never completed the parsing file. It failed after 20 minutes run.
2. Unlike Sparkling Water, it actually read files during the parsing phase, not in the importing phase.
3. The performance is pretty bad compared with Sparkling Water. I guess Sparkling Water is using underlined Spark to distribute the load evenly.

Anyway this hadoop jar h2odriver.jar solution is not an ideal workaround for this issue.

Then I happened to read this blog: Incorrect results with corrupt binary statistics with the new Parquet reader. This article has nothing to do my issue, but it mentioned about parquet v1.8. I did remember seeing one note from one Sparkling Water developer discussing should integrate with parquet v1.8 in the future for certain parquet issue in H2O. Unfortunately I could not find the link to this discussion any more. But it inspired me to think that maybe the issue is that Sparkling Water depends certain parquet library and the current environment don’t have it. The standard CDH distribution and Spark2 seem using parquet v1.5. Oracle BDA has many more software installed and maybe it happened to have the correct library installed somewhere. It seems H2O related jar file may contain this library, what’s happened if I include the H2O jar somewhere in Sparkling Water.

With this idea in mind, I download H2O from http://h2o-release.s3.amazonaws.com/h2o/rel-wheeler/4/index.html. Unzip the file and h2o.jar file is the one I need. I then modified sparkling-shell and change the last line of code as follows by add h2o.jar file to jars parameter.

#spark-shell --jars "$FAT_JAR_FILE" --driver-memory "$DRIVER_MEMORY" --conf spark.driver.extraJavaOptions="$EXTRA_DRIVER_PROPS" "$@"
H2O_JAR_FILE="/home/mytestuser/install/h2o-3.16.0.4/h2o.jar"
spark-shell --jars "$FAT_JAR_FILE,$H2O_JAR_FILE" --driver-memory "$DRIVER_MEMORY" --conf spark.driver.extraJavaOptions="$EXTRA_DRIVER_PROPS" "$@"

Restart my H2O cluster. It worked!

Finally after many days work, Sparkling Water can work again in the new cluster. Reloading the big testing dataset, it took less than 1 minute to load the same dataset with only 24 H2O nodes. The load was also evenly distributed to the cluster. Problem solved!

Advertisements

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s