Join Type: Bucket Join

In the last three blogs, I discussed Common Join, Map Join and Skewed Join. Common Join is the default join type. Map Join is best used when one of join tables is small and can be fit into memory. Skewed Join improves the query performance for data is skewed in the join keys. What happened when both join tables are very large and none of the above three joins can work? This is where Bucket Join fits in.

Bucket Join is also called Collocated Joins. It is used when all join tables are large and table data has been distributed by the join key. In this case, data copy is unnecessary. It’s map side join and join can happen in the local node. Another condition for Bucket Join is that the number of buckets in one table must be equal or multiple of the number of buckets in another table of the join.

So at the time when creating the table, make sure the buckets are created using the join columns, and BEFORE the data is inserted into the table. Also set both parameter hive.optimize.bucketmapjoin and hive.enforce.bucketing to true before inserting data. One example of creating a bucketed table is shown below:

CREATE TABLE mytable ( 	
name string,	 
city string,	
employee_id int ) 	
PARTITIONED BY (year STRING, month STRING, day STRING) 	
CLUSTERED BY (employee_id) INTO 256 BUCKETS	
;

The above join is also called Bucket Map Join. If the join tables have the same number of buckets and data are also sorted using the join columns, Sort Merge Bucket Map Join is used.

How to Identify the Join
When using EXPLAIN command, you will see Sorted Merge Bucket Map Join Operator below Map Operator Tree.

Example

set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

Join Type in Hive: Skewed Join

In the last blogs, I discussed Common Join and Map Join. In this blog, I am going to discuss Skewed Join. Remember the blog of Common Join, I mentioned one of major issues in Common Join is the join performs poorly when data is skewed. The query is waiting for the longest running reducers on the skewed keys while majority of reducers complete the join operation.

Skewed Join is exactly targeting this problem. At runtime, it scans the data and detects the keys with a large skew, which is controlled by parameter hive.skewjoin.key. Instead of processing those keys, it stores them temporarily in an HDFS directory. Then in a map-reduce job, process those skewed keys. The same key need not be skewed for all the tables, and so the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a map-join.
For example, let’s say we have a join with Table A and B. Both Table A and B has skewed data “mytest” in the joining column. Assuming Table B has fewer rows with skewed data in Table A. The first step is to scan B and save all rows with the key “mytest” in an in-memory hash table. Then run a set of mappers to read Table A to perform the followings:

  • If it has skewed key “mytest”, then it will use hashed version of B for the join.
  • For all other keys, send the rows to a reducer that performs the join. The same reducer will get rows from the mappers that scanning Table B.

We can see that Table B is scanned twice during Skewed Join. The skewed keys in Table A are read and processed by the mapper, and perform map-side join. The rows with skewed keys in Table A has never sent to the reducer. For the rest of keys in Table A, they uses the regular common join approach.

To use Skewed Join, you need to understand your data and query. Set parameter hive.optimize.skewjoin to true. Parameter hive.skewjoin.key is optional and it is 100000 by default.

How to Identify the Join
When using EXPLAIN command, you will see handleSkewJoin: true below Join Operator and Reduce Operator Tree.

Example

set hive.optimize.skewjoin = true;
set hive.skewjoin.key=500000;
set hive.skewjoin.mapjoin.map.tasks=10000;
set hive.skewjoin.mapjoin.min.split=33554432;

Join Type in Hive: Map Join

In the last blog, I discussed the default join type in Hive: Common Join. In this blog, I am going to discuss Map Join, also called Auto Map Join, or Map Side Join, or Broadcast Join.

One major issue from the Common Join or Sort Merged Join is too much activity spending on shuffling data around. To speed up the Hive queries, Map Join can be used. If one of the tables in the join is a small table and can be loaded into memory, then Map Join can be used.

The first step of the Map Join is to create a Map Reduce local task before the original Map Reduce task. This map/reduce task read data of the small table from HDFS and save it into an in-memory hash table, then into a hash table file. Next, when the original join Map Reduce task starts, it moves the hash table file to the Hadoop Distributed Cache, which will populate the file to each mapper’s local disk. So all the mapper can load this hash table file into the memory and then do the join in Map stage. For example, for a join with big table A and small table B, for every mapper for table A, Table B is read completely. As the smaller table is loaded into memory and then join is performed in the map phrase of the MapReduce job, no reducer is needed and reduce phase is skipped. The map join performs faster than the regular default join.

Parameters

  • The most important parameter for Map Join is hive.auto.convert.join. It must be set to true.
  • Dur the join, the determination of small table is controlled by parameter hive.mapjoin.smalltable.filesize. By default, it is 25MB.
  • When three or more tables involved in the join, Hive generates three or more map-side joins with the all assumption that all tables are of smaller size. To speed up the join further, you can combine three or more map-side joins into one single map-side join if size of n-1 table is less than 10MB, which is the default value. To achieve this, you need to set hive.auto.convert.join.noconditionaltask parameter to true and specify parameter hive.auto.convert.join.noconditionaltask.size.

Restriction

  • Full outer joins are never converted to Map Join.
  • Left outer join can be converted to Map Join only if the right table is less than 25 MB in size. Right-outer join doesn’t work.

How to Identify the Join
When using EXPLAIN command, you will see Map Join Operator just below Map Operator Tree.

Other
You can use hint to specify the query using Map Join. The example below shows that smaller table is the one put in the hint, and force to cache table B manually.

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

Example

hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; 

Join Type in Hive: Common Join

During the performance tuning for Hive query, one area needs to be paid attention is the join type during the execution. Just like join types in Oracle, different types can have significantly different execution time. In the next few blogs, I am going to discuss the join types in Hive. The first join type is Common Join.

Common Join is the default join type in Hive, also called Shuffle Join, or Distributed Join or Sort Merged Join. During the join, all rows from the two tables are distributed to all nodes based on the join keys. In this way, the values from the same join keys end up on the same node. This join has a complete cycle of Map/Reduce.

How it works
1. In the map stage, mappers reads the tables and output the join-column value as the key. The key-value pairs are written into an intermediate file.
2. In the shuffle stage, these pairs are sorts and merged. All rows from the same key will be sent to the same reducer instance.
3. In the reduce stage, reducer gets the sorted data and performs the join.

The advantage of Common Join is that it works in any table size. But as shuffle is an expensive operation, it is quite resource intensive. If one or more join keys has significantly large proportion of the data, the corresponding reducers will be overloaded. The symptom of the issue is that majority of reducers have completed the join operation while a few reducers are still running. The total run time of the query is determined by the longest running reducer. Obviously this is a typical skewed data issue. I am going to discuss a special join just for this kind of skewed issue in a later blog.

How to Identify the Join
When using EXPLAIN command, you will see Join Operator just below Reduce Operator Tree.

Data Query between BDA and Exadata (Part 4): Query Oracle Dump File on BDA Using Copy2BDA

ship_ship
In the last three blogs below, I mainly discussed how to query data between BDA and Exadata and a way to load data from BDA to Exadata using OLH.
Data Query between BDA and Exadata (Part 1): Query Hive Table from Oracle on Exadata
Data Query between BDA and Exadata (Part 2): Query Oracle Table from Hive on BDA
Data Query between BDA and Exadata (Part 3): Load data from BDA to Oracle Table on Exadata Using OLH

When moving data from Oracle Database to Hadoop, I discussed the approach using Sqoop in this blog. There is actually another better approach on BDA by using Copy to BDA. Copy to BDA allows you to copy tables from an Oracle database into Hadoop in the format of regular Oracle Data Pump format, dmp file. After generating Data Pump format files from the tables and copying the files to HDFS, you can use Hive to query the data locally without accessing remote Oracle database on Exadata.

Ok, here is the overview of my plan to do the work. I am going to use the same Oracle table, ORA_TARGET_STOCK_PRICE1, used in my last blog on Exadata to generate an Oracle Data Pump dmp file. Then copy the Data Pump format file to the BDA and load it into HDFS. After the dmp file is available on HDFS, link the dmp file to a Hive external table. Then you can query the Oracle Data Pump file directly by using Hive on BDA.

Here are the detail steps:
On Exadata:
Step 1: Create db directory and generate Oracle Data Pump format file

mkdir -p /home/oracle/wzhou/mytest/expdir
sqlplus / as sysdba
CREATE DIRECTORY exp_2hdfs_dir AS '/home/oracle/wzhou/mytest/expdir';
GRANT READ, WRITE ON DIRECTORY exp_2hdfs_dir TO WZHOU;

Step 2: Create data pump format files for the data

sqlplus wzhou/wzhou
WZHOU:SQL> SELECT COUNT(*) FROM ORA_TARGET_STOCK_PRICE1;

   COUNT(*)
-----------
         65
         
WZHOU:SQL> CREATE TABLE EXT_DMP_ORA_TEST2
ORGANIZATION EXTERNAL
(                    
   TYPE oracle_datapump
   DEFAULT DIRECTORY exp_2hdfs_dir
   LOCATION('exa_ora_dmp_test2.dmp')
)
AS SELECT * FROM ORA_TARGET_STOCK_PRICE1;
2    3    4    5    6    7    8  
Table created.

Verify the result

WZHOU:SQL> desc EXT_DMP_ORA_TEST2;
 Name               Null?    Type
 ------------------ -------- -----------------
 STOCK_DATE                  VARCHAR2(20)
 CLOSE_PRICE                 NUMBER(8,3)
 VOLUME                      NUMBER(8)
 OPEN_PRICE                  NUMBER(8,3)
 HIGH_PRICE                  NUMBER(8,3)
 LOW_PRICE                   NUMBER(8,3)

WZHOU:SQL> select count(*) from EXT_DMP_ORA_TEST2;
   COUNT(*)
-----------
         65

WZHOU:SQL> !ls -l /home/oracle/wzhou/mytest/expdir
total 44
-rw-r----- 1 oracle dba 16384 Nov  5 19:12 exa_ora_dmp_test2.dmp
-rw-r--r-- 1 oracle dba   258 Nov  5 19:15 EXT_DMP_ORA_TEST2_180093.log

Step 3: Copy the dump file to Hadoop HDFS

. $ORACLE_HOME/bigdatasql/hadoop_enkbda.env
cd /home/oracle/wzhou/mytest/expdir
ls *.dmp
hadoop fs -mkdir /user/oracle/mytest/dmp
hadoop fs -put *.dmp /user/oracle/mytest/dmp
hadoop fs -ls /user/oracle/mytest/dmp

Ok, I am done with the work on Exadata and move my work to BDA side
On BDA
Step 4: Create the mapping external table in Hive
[root@enkbda1node01 ~]# su – oracle
[oracle@enkbda1node01 ~]$ hive
hive> use default;
OK
Time taken: 0.486 seconds
hive>
CREATE EXTERNAL TABLE ext_ora_dump_test2
ROW FORMAT SERDE ‘oracle.hadoop.hive.datapump.DPSerDe’
STORED AS
INPUTFORMAT ‘oracle.hadoop.hive.datapump.DPInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’
LOCATION ‘/user/oracle/mytest/dmp’;

OK
Time taken: 0.48 seconds

Step 5: Querying the Data in Hive

hive> DESCRIBE ext_ora_dump_test2;
OK
stock_date          	varchar(20)         	from deserializer   
close_price         	decimal(8,3)        	from deserializer   
volume              	int                 	from deserializer   
open_price          	decimal(8,3)        	from deserializer   
high_price          	decimal(8,3)        	from deserializer   
low_price           	decimal(8,3)        	from deserializer   
Time taken: 0.188 seconds, Fetched: 6 row(s)

hive> show create table  ext_ora_dump_test2;
OK
CREATE EXTERNAL TABLE `ext_ora_dump_test2`(
  `stock_date` varchar(20) COMMENT 'from deserializer', 
  `close_price` decimal(8,3) COMMENT 'from deserializer', 
  `volume` int COMMENT 'from deserializer', 
  `open_price` decimal(8,3) COMMENT 'from deserializer', 
  `high_price` decimal(8,3) COMMENT 'from deserializer', 
  `low_price` decimal(8,3) COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'oracle.hadoop.hive.datapump.DPSerDe' 
STORED AS INPUTFORMAT 
  'oracle.hadoop.hive.datapump.DPInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://enkbda-ns/user/oracle/mytest/dmp'
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='false', 
  'numFiles'='0', 
  'numRows'='-1', 
  'rawDataSize'='-1', 
  'totalSize'='0', 
  'transient_lastDdlTime'='1478391818')
Time taken: 0.092 seconds, Fetched: 22 row(s)

hive> select * from ext_ora_dump_test2 limit 3;
OK
9/23/16	24.05	56837	24.13	24.22	23.88
9/22/16	24.1	56675	23.49	24.18	23.49
9/21/16	23.38	70925	23.21	23.58	23.025
Time taken: 0.237 seconds, Fetched: 3 row(s)

Ok, I can get the result from ora dump file in Hive. Nice feature.

Additional Note:

  • In my test, I use only one dmp file. You actually can generate multiple dmp files to speed up the export of dmp file as well as faster data access on Hadoop. The syntax to use multiple dmp files is like LOCATION(‘file1.dmp, file2.dmp’).
  • This feature is quite useful when you want to reuse the same Oracle dmp file on both sides. For example, you can use the dmp file on HDFS to query data directly on BDA without connecting to Oracle database on Exadata. At the same time, if you want to reuse the same data on Exadata, you could easily reimport the same dmp file to Oracle database on Exadata.
  • At the time when the source Oracle table changes, you can regenerate the dmp file on Exadata and then refresh the copy in Hadoop. Copy to BDA is primarily useful for Oracle tables that are relatively static and do not require frequent refreshes.
  • Copy to BDA is licensed under Oracle Big Data SQL and you have to have an Oracle Big Data SQL license before using Copy to BDA feature.