HDinsight - How to perform Bulk Load with Phoenix ?

Apache HBase is an open Source No SQL Hadoop database, a distributed, scalable, big data store. It provides real-time read/write access to large datasets. HDInsight HBase is offered as a managed cluster that is integrated into the Azure environment. HBase provides many features as a big data store. But in order to use HBase, the customers have to first load their data into HBase.

There are multiple ways to get data into HBase such as - using client API’s, Map Reduce job with TableOutputFormat or inputting the data manually on HBase shell. Many customers are interested in using Apache Phoenix – a SQL layer over HBase for its ease of use. The current post describes about how to use phoenix bulk load with HDinsight clusters.

Phoenix provides two methods for loading CSV data into Phoenix tables – a single-threaded client loading tool via the psql command, and a MapReduce-based bulk load tool.

Single threaded method

Please note that this method is suitable when your bulk load data is in tens of megabytes. Thus, this may not be a suitable option for most of the production scenarios. Following are the steps to use this method.

    • Create Table:

Put command to create table in a file (let’s say CreateTable.sql) based on the schema of your table. Example:

 CREATE TABLE input Table (
        Field1 varchar NOT NULL PRIMARY KEY,
        Field2 varchar,
        Field3 decimal,
        Field4 INTEGER,
        Field5 varchar);
    • Input data: This file contains the input data for bulk load (let’s say input.csv).
    • Query to execute on the data: You can put any SQL query which you would like to run on the data (let's say Query.sql). A Sample query:
 SELECT Field2, Field3 from inputTable group by field5;
    • Zookeeper Quorum:

Figure out zookeeper quorum string. The zookeeper string is present in file /etc/hbase/conf/hbase-site.xml. The name of property is hbase.zookeeper.quorum.

    • Bulk Load command: Go to /usr/hdp/current/phoenix-client/bin and run following command.
 python psql.py ZookeeperQuorum CreateTable.sql Input.csv Query.sql

Map Reduce-based method

This method is a better option for large load volumes. Thus, it is a suitable method for the production scenario. Following are the steps to use this method.

    • Zookeeper Quorum: Figure out zookeeper quorum string. The zookeeper string is present in file /etc/hbase/conf/hbase-site.xml. The name of property is hbase.zookeeper.quorum.
    • Create Table: Put command to create table in a file(let’s say CreateTable.sql) based on the schema of your table.
    • Verification of schema of the table:

Go to /usr/hdp/current/phoenix-client/bin and run following command:

 python psql.py ZookeeperQuorum

Now, in order to check schema of your table, run !describe inputTable

    • Input data: Figure out the path of your input data. The input files may be in your WASB/ADLS storage account. Let’s say the input files are present in inputFolderBulkLoad under the parent directory of your storage account.
    • Bulk Load command: Go to /usr/hdp/current/phoenix-client/bin and run following command.
 /usr/hdp/current/phoenix-client$ HADOOP_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/etc/hbase/conf hadoop jar /usr/hdp/2.4.2.0-258/phoenix/phoenix-4.4.0.2.4.2.0-258-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table inputTable --input /inputFolderBulkLoad/*.csv –zookeeper ZookeeperQuorum:2181:/hbase-unsecure
    • Bulk load with ADLS Storage:

If you have an ADLS cluster then following are the changes in the above steps.

      • ADLS root directory

Figure out the root directory for ADLS. In order to find root directory, look for hbase.rootdir entry in hbase-site.xml.

  • Bulk Load command

 

In order to run bulk load command, please go to /usr/hdp/current/phoenix-client and pass ADLS input and output folders as parameters. Example query.

 $ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf  hadoop jar /usr/hdp/2.4.2.0-258/phoenix/phoenix-4.4.0.2.4.2.0-258-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table InputTable --input adl://hdinsightconf1.azuredatalakestore.net:443/hbase1/data/hbase/temp/input/*.csv –zookeeper ZookeeperQuorum:2181:/hbase-unsecure --output  adl://hdinsightconf1.azuredatalakestore.net:443/hbase1/data/hbase/output1
            

In the above command, adl://hdinsightconf1.azuredatalakestore.net:443/hbase1 is the ADLS root directory.

Following are some recommendations.

    • Storage medium:

We recommend to use same storage medium for both input and output folders. It means that both the input and output folders should be either in WASB or in ADLS.
In case you want to transfer data from WASB to ADLS, you can use distcp command. Example:

 hadoop distcp wasb://@.blob.core.windows.net/example/data/gutenberg adl://.azuredatalakestore.net:443/myfolder
    • Cluster configuration:

The map processes of Map Reduce based method produce large amounts of temporary output which fill up the available non-DFS space. Please pick larger worker node VM for large amount of bulk load. The number of worker nodes will directly affect the processing speed.

    • Bulk data input:

As the bulk load is a storage intensive operation, it is recommended to split your input into multiple chunks(~10GB each) and then perform bulk load on them.

    • Avoiding Region Server hotspotting:

HBase sequential write may suffer from region server hotspotting if your row key is monotonically increasing. Salting the row key provides a way to mitigate the problem. Phoenix provides a way to transparently salt the row key with a salting byte for a particular table. Please find more details here.

Anunay Tiwari(@antiwari) and Gaurav Kanade(@gkanade)