Google Cloud SQL vs Cloud DataStore vs BigTable vs BigQuery vs Spanner

Many people are familiar with Amazon AWS cloud, but Google Cloud Platform (GCP) is another interesting cloud provider. For Cloud DB storage option on GCP, Google provides the options like Cloud SQL, Cloud Datastore, Google BigTable, Google Cloud BigQuery, and Google Spanner. In this blog, I am going to discuss all of these five options, but mainly focusing on last three as I am more interested in the options that handle large amount of data.

Cloud SQL
If you want to have full relational database in supporting customized table views, stored procedures, tons of indexes and ACID compliance, Cloud SQL is probably your potential choice here. Google Cloud SQL is the database service that support two types of databases: MySQL and PostgreSQL. Both support High Available (HA) and Pay Per Use without Lock-in. It can scale up to 32 processor cores and more than 200GB RAM. Although this option might make your life easier in migrating your data to cloud, it does have all the limitations in MySQL and PostgreSQL, and not scaling well for huge data volume. There are many blogs about performance limitation on MySQL and PostgreSQL. I am not going to repeat here.

For more information about Cloud SQL, please visit https://cloud.google.com/sql/.

Cloud Datastore
Google Cloud DataStore is a cloud-based NoSQL database for web and mobile applications. It’s scalable NoSQL database and can automatically handles sharding and replication. It also supports ACID transaction, SQL-like queries and REST API. Unlike BigTable, Datastore is optimized for smaller set of data. Although Cloud Datastore is a NoSQL db and you don’t need to define a schema before storing a row, it actually uses more for ad hoc storage of structured data. Cloud Datastore does not have SQL, but have an API called GQL to perform some kind of queries. Here is one example of the query.

// List Google companies with less than 400 employees.
var companies = query.filter('name =', 'Google').filter('size <', 400);

Someone mentioned that Cloud Datastore actually originated from Google’s internal-use database, Megastore. Megastore is widely used inside Google. I couldn’t find Google’s official statement about the link between these two products. But from Google’s publication about Megastore, it does look quite similar to Cloud Datastore.

For more information about Cloud Datastore, please visit https://cloud.google.com/datastore/.

Big Table
Google BigTable is Google’s cloud storage solution for low latency data access. It was originally developed in 2004 and was built on Google File System (GFS). There is one paper about BigTable: Bigtable: A Distributed Storage System for Structured Data. Now It is widely used in many Google’s core services like Google Search, Google Maps, and Gmail. It is designed in NoSQL architecture, but can still use row-based data format. With data read/write under 10 milliseconds, it is good for applications that have frequent data ingestion. It can be scaleable to hundreds of petabytes and handle millions of operations per second.

BigTable is compatible with HBase 1.0 API via extensions. Any move from HBase will be easier. BigTable has no SQL interface and you can only use API go Put/Get/Delete individual rows or run scan operations. BigTable can be easily integrated with other GCP tools, like Cloud Dataflow and Dataproc. BigTable is also the foundation for Cloud Datastore.

Unlike other clouds, GCP compute and storage are separate. You need to consider the following three parts when calculating the cost.
1. The type of Cloud instance, and the number of nodes in the instance.
2. The total amount of storage your tables use.
3. The amount of network bandwidth used. Please note: some part of network traffic is free.

It’s good and bad. The good part is that you don’t need to pay for the compute cost if your system is idle and you pay only the storage cost. The bad part is that it is not easy to forecast your compute usage if you have very large dataset.

As for pricing, I listed the cost to create a 10 node BigTable in us-east1-b zone and for production instance type only. The first shows SSD storage type while the second one shows HDD storage type.

 

 

 

 

 

 

 

 

This is the pricing as of June 10, 2017, it could change in the future without notice. There are some interesting observations:

  • The compute cost is the same no matter you choose SSD and HDD storage type. It makes sense as storage and compute are separated in GCP.
  • The Writes are the same for both cases. The Reads is about 20 times slower in HDD compared with SSD. But the scans for HDD drops just 20%. If you know your access pattern is mostly scan, HDD option seem not bad. Although I would not recommend HDD, just feel this observation is interesting and puzzling.
  • The cost to go with HDD storage is only 15% of the cost of going with SSD.

For more information , please visit http://cloud.google.com/bigtable/.

BigQuery
BigQuery is Google’s Cloud-based data warehousing solution. Unlike BigTable, it targets data in big picture and can query huge volume of data in a short time. As the data is stored in columnar data format, it is much faster in scanning large amounts of data compared with BigTable. BigQuery allows you to scale to petabyte and is great enterprise data warehouse for analytics. BigQuery is serverless. Serverless computing means computing resource can be spun up on-demand. It benefits users from zero server usage to full-scale usage without involving administrators and managing infrastructure. According to Google, BigQuery can scan Terabytes of data in seconds and Petabytes of data in minutes. For data ingestion, BigQuery allows you to load data from Google Cloud Storage, or Google Cloud DataStore, or stream into BigQuery storage.

However, BigQuery is really for OLAP type of query and scan large amount of data and is not designed for OLTP type queries. For small read/writes, it takes about 2 seconds while BigTable takes about 9 milliseconds for the same amount of data. BigTable is much better off for OLTP type of queries. Although BigQuery support atomic single-row operations, it lacks cross-row transaction support.

For pricing, there are some free operations. I won’t discuss more about the free operation, but just about standard pricing for the most important components in the BigQuery. There are two major components in the cost of using BigQuery: Storage Cost and Query Cost

For storage cost, it is $0.02 per GB/month. However, Google has a long term storage pricing, which is 50% off to $0.01 per GB/month. The definition of long term storage is the table that is not edited (APPEND, OVERWRITE or STEAMING) for 90 days. Each partition in the table is considered separate storage. So you could have standard pricing for some recent partitions while have long term storage pricing for some historical partitions. Even the data is in long term storage, there is no degradation of performance, durability and availability.

For query cost, the first 1 TB of data processed in a month is free, then it is $5 per TB. No charge for cached queries. As BigQuery is stored in columnar data format, the query cost is based on the columns selected. For enterprise with large amount of data and tons of applications, although the bill for data storage is predictable, the bill for query cost is not. The good news is that Google does offer a flat rate monthly cost model instead of on-demand pricing. For example, you can pay $10,000 for 500 BigQuery Slots and BigQuery automatically manages these slot quota.

For more documentation about BigQuery, please visit http://cloud.google.com/bigquery/docs. A sample of BigQuery screen is shown below:

Cloud Spanner
Cloud Spanner is a globally distributed database and was just officially released last month in May 2017. It is a versioned key-value store. From this perspective, it is similar to BigTable. However, it support general-purpose transactions and provide SQL-based query language.

Spanner was developed in 2011 and used internally for Google’s advertising backend, which is called F1. F1 was initially based on a MySQL database. As Google grows rapidly in its advertising revenue, so is the F1 MySQL database. The uncompressed dataset is in tens of TB. It’s definitely way beyond the comfort zone for MySQL. Even with tremendous effort in sharding scheme, the management of the database became very complex and costly. The last resharding of this MySQL database took two years of intense effort. Please note, even for a great company like Google with so many talented people, it still took two years efforts. I can’t image how other companies can survive this size of MySQL database. This is why I am usually cautious about using MySQL database with large size footprint.

Two features I like most in Cloud Spanner:
1. Replication Configuration
Data replication is handled automatically and transparently. But user application can control the way how data is stored. For example, if user data has the requirement to stay in USA only, you could specify to store data in US data centers only. If you want to improve the read performance and availability, you could increase the number of replicas used and geographic placement of replicas to make the data is close to the users as much as possible. If you want to have fast write throughput, you could decide how far replicas are from each other.

2. Globally-distributed database allowing consistent reads and writes.
This feature is critical if I want to have a consistent backup, or have consistent reads at global scale level. The implementation of this feature is using Google’s TrueTime. In stead of using only one source for the time reference, TrueTime is based on the time references from both GPS and atomic clocks. Google indicates the reason to use two different kinds of time reference because they have different failure models. Atomic clocks can fail over long periods of time like drift significantly while GPS can fail when receivers fail or radio interference. Usually you won’t see both time reference fails at the same time because they have different failure models.

Spanner is organized in a set of zones. Each zone has one zone master and 100 to 1000 spanserver. Each table is split into multiple tablets. A table’s state is stored in set of B-tree like structure files and Write-Ahead Log on a file system called Colossus. Colossus is a global distributed file system and the successor to Google File System (GFS). Spanner’s data model is not purely relational, but semi-relational. Each rows must have names, and each table is required to have an ordered set of one or more primary-key columns. Google publishes a Best Practice for Spanner Schema Design. For more information about the Spanner architecture, please check out Google’s research paper: Spanner: Google’s Globally-Distributed Database.

The following shows the option to create a Spanner instance with 10 nodes.

The storage cost is $0.30 per GB/month and $9 per hour per node. Each Spanner node can provide up to 10,000 QPS of reads or 2000 QPS of writes (writing single rows at 1KB data per row), and 2 TB disk storage. Google also recommend provision more spanner nodes to keep CPU utilization below 75%.

Among these five database storage options in GCP, I like Cloud Spanner the most as I feel it is the best to meet the requirement of supporting Relational databases and great scalability and availability. But if you want to find which option is really for you, Google has a nice decision tree to help you to determine the best option for you.

For more information about Cloud Spanner, please visit https://cloud.google.com/spanner/docs/.

Join Type: Bucket Join

In the last three blogs, I discussed Common Join, Map Join and Skewed Join. Common Join is the default join type. Map Join is best used when one of join tables is small and can be fit into memory. Skewed Join improves the query performance for data is skewed in the join keys. What happened when both join tables are very large and none of the above three joins can work? This is where Bucket Join fits in.

Bucket Join is also called Collocated Joins. It is used when all join tables are large and table data has been distributed by the join key. In this case, data copy is unnecessary. It’s map side join and join can happen in the local node. Another condition for Bucket Join is that the number of buckets in one table must be equal or multiple of the number of buckets in another table of the join.

So at the time when creating the table, make sure the buckets are created using the join columns, and BEFORE the data is inserted into the table. Also set both parameter hive.optimize.bucketmapjoin and hive.enforce.bucketing to true before inserting data. One example of creating a bucketed table is shown below:

CREATE TABLE mytable ( 	
name string,	 
city string,	
employee_id int ) 	
PARTITIONED BY (year STRING, month STRING, day STRING) 	
CLUSTERED BY (employee_id) INTO 256 BUCKETS	
;

The above join is also called Bucket Map Join. If the join tables have the same number of buckets and data are also sorted using the join columns, Sort Merge Bucket Map Join is used.

How to Identify the Join
When using EXPLAIN command, you will see Sorted Merge Bucket Map Join Operator below Map Operator Tree.

Example

set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

Join Type in Hive: Skewed Join

In the last blogs, I discussed Common Join and Map Join. In this blog, I am going to discuss Skewed Join. Remember the blog of Common Join, I mentioned one of major issues in Common Join is the join performs poorly when data is skewed. The query is waiting for the longest running reducers on the skewed keys while majority of reducers complete the join operation.

Skewed Join is exactly targeting this problem. At runtime, it scans the data and detects the keys with a large skew, which is controlled by parameter hive.skewjoin.key. Instead of processing those keys, it stores them temporarily in an HDFS directory. Then in a map-reduce job, process those skewed keys. The same key need not be skewed for all the tables, and so the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a map-join.
For example, let’s say we have a join with Table A and B. Both Table A and B has skewed data “mytest” in the joining column. Assuming Table B has fewer rows with skewed data in Table A. The first step is to scan B and save all rows with the key “mytest” in an in-memory hash table. Then run a set of mappers to read Table A to perform the followings:

  • If it has skewed key “mytest”, then it will use hashed version of B for the join.
  • For all other keys, send the rows to a reducer that performs the join. The same reducer will get rows from the mappers that scanning Table B.

We can see that Table B is scanned twice during Skewed Join. The skewed keys in Table A are read and processed by the mapper, and perform map-side join. The rows with skewed keys in Table A has never sent to the reducer. For the rest of keys in Table A, they uses the regular common join approach.

To use Skewed Join, you need to understand your data and query. Set parameter hive.optimize.skewjoin to true. Parameter hive.skewjoin.key is optional and it is 100000 by default.

How to Identify the Join
When using EXPLAIN command, you will see handleSkewJoin: true below Join Operator and Reduce Operator Tree.

Example

set hive.optimize.skewjoin = true;
set hive.skewjoin.key=500000;
set hive.skewjoin.mapjoin.map.tasks=10000;
set hive.skewjoin.mapjoin.min.split=33554432;

Join Type in Hive: Map Join

In the last blog, I discussed the default join type in Hive: Common Join. In this blog, I am going to discuss Map Join, also called Auto Map Join, or Map Side Join, or Broadcast Join.

One major issue from the Common Join or Sort Merged Join is too much activity spending on shuffling data around. To speed up the Hive queries, Map Join can be used. If one of the tables in the join is a small table and can be loaded into memory, then Map Join can be used.

The first step of the Map Join is to create a Map Reduce local task before the original Map Reduce task. This map/reduce task read data of the small table from HDFS and save it into an in-memory hash table, then into a hash table file. Next, when the original join Map Reduce task starts, it moves the hash table file to the Hadoop Distributed Cache, which will populate the file to each mapper’s local disk. So all the mapper can load this hash table file into the memory and then do the join in Map stage. For example, for a join with big table A and small table B, for every mapper for table A, Table B is read completely. As the smaller table is loaded into memory and then join is performed in the map phrase of the MapReduce job, no reducer is needed and reduce phase is skipped. The map join performs faster than the regular default join.

Parameters

  • The most important parameter for Map Join is hive.auto.convert.join. It must be set to true.
  • Dur the join, the determination of small table is controlled by parameter hive.mapjoin.smalltable.filesize. By default, it is 25MB.
  • When three or more tables involved in the join, Hive generates three or more map-side joins with the all assumption that all tables are of smaller size. To speed up the join further, you can combine three or more map-side joins into one single map-side join if size of n-1 table is less than 10MB, which is the default value. To achieve this, you need to set hive.auto.convert.join.noconditionaltask parameter to true and specify parameter hive.auto.convert.join.noconditionaltask.size.

Restriction

  • Full outer joins are never converted to Map Join.
  • Left outer join can be converted to Map Join only if the right table is less than 25 MB in size. Right-outer join doesn’t work.

How to Identify the Join
When using EXPLAIN command, you will see Map Join Operator just below Map Operator Tree.

Other
You can use hint to specify the query using Map Join. The example below shows that smaller table is the one put in the hint, and force to cache table B manually.

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

Example

hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; 

Join Type in Hive: Common Join

During the performance tuning for Hive query, one area needs to be paid attention is the join type during the execution. Just like join types in Oracle, different types can have significantly different execution time. In the next few blogs, I am going to discuss the join types in Hive. The first join type is Common Join.

Common Join is the default join type in Hive, also called Shuffle Join, or Distributed Join or Sort Merged Join. During the join, all rows from the two tables are distributed to all nodes based on the join keys. In this way, the values from the same join keys end up on the same node. This join has a complete cycle of Map/Reduce.

How it works
1. In the map stage, mappers reads the tables and output the join-column value as the key. The key-value pairs are written into an intermediate file.
2. In the shuffle stage, these pairs are sorts and merged. All rows from the same key will be sent to the same reducer instance.
3. In the reduce stage, reducer gets the sorted data and performs the join.

The advantage of Common Join is that it works in any table size. But as shuffle is an expensive operation, it is quite resource intensive. If one or more join keys has significantly large proportion of the data, the corresponding reducers will be overloaded. The symptom of the issue is that majority of reducers have completed the join operation while a few reducers are still running. The total run time of the query is determined by the longest running reducer. Obviously this is a typical skewed data issue. I am going to discuss a special join just for this kind of skewed issue in a later blog.

How to Identify the Join
When using EXPLAIN command, you will see Join Operator just below Reduce Operator Tree.