HDinsight – How to use Spark-HBase connector?

Apache Spark is an open-source parallel processing framework that supports in-memory processing to boost the performance of big-data analytic applications. Azure HDInsight offers a fully managed Spark service with many benefits.

Apache HBase is an open Source No SQL Hadoop database, a distributed, scalable, big data store. It provides real-time read/write access to large datasets. HDInsight HBase is offered as a managed cluster that is integrated into the Azure environment. HBase provides many features as a big data store.

Spark-Hbase Connector

The Spark-Hbase Connector provides an easy way to store and access data from HBase clusters with Spark jobs. HBase is really successful for highest level of data scale needs. Thus, existing Spark customers should definitely explore this storage option. Similarly, if the customers are already having HDinsight HBase clusters and they want to access their data by Spark jobs then there is no need to move data to any other storage medium. In both the cases, the connector will be extremely useful.

Steps to use connector

Currently, we need to manually install the connector on the Spark cluster. We are planning to release it soon with HDInsight clusters. The connector can be installed in 4 simple steps:

  • Step 1: Create a VNET.
  • Step 2: Create Spark and Hbase cluster in same or different subnet of same VNET.
  • Step 3: Copy hbase-site.xml from HBase cluster to your Spark cluster.
  • Step 4: Install the connector.

Following are the detailed steps.

  • Create a Azure Virtual network.The VNET can be easily created from Azure portal.

  • Setup Spark and Hbase clusters. Please find instructions for linux and windows. The connector is tested with Spark 2.1 (HDI 3.6) clusters.

  • On Spark cluster, upgrade maven(if needed) to compile the package.

     sudo apt-get install maven
    
  • Copy Package code from Spark-Hbase Connector. Also copy Hbase configuration xml file to Spark configuration folder.

     sudo cp hbase-site.xml /etc/spark/conf/
    
  • Compile

     mvn package -DskipTests
    
  • Run Spark Submit

     $SPARK_HOME/bin/spark-submit --verbose --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-client --packages com.hortonworks:shc-core:1.1.0-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/ --files /etc/spark2/conf/hbase-site.xml /To/your/application/jar
    
     $SPARK_HOME/bin/spark-submit --verbose --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-cluster --packages com.hortonworks:shc-core:1.1.0-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/ --files /etc/spark2/conf/hbase-site.xml /home/sshuser/shc/examples/target/shc-examples-1.1.1-2.1-s_2.11-SNAPSHOT.jar
    
  • For example, you can run the examples of the package by following command.

Sample Program

  • Spark Shell

     $SPARK_HOME/bin/spark-shell --packages com.hortonworks:shc-core:1.1.0-2.1-s_2.11
    
  • Following are the important commands.

  • Include packages

    
    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    val sparkConf = new SparkConf().setAppName("HBaseTestApp")
    import spark.sqlContext.implicits._
    
  • Define Class and Object

     case class HBaseRecordAirline(col0: String,Year: Int,Quarter: Int,Month: Int,DayofMonth: Int,DayOfWeek: Int,FlightDate: Int,UniqueCarrier: String,AirlineID: String)
    defined class HBaseRecordAirline
    object HBaseRecordAirlineTest {def apply(i: Int): HBaseRecordAirline = {val s = s"""row${"%03d".format(i)}""" 
                                                                            HBaseRecordAirline(s,i,i,i,i,i,i,s,s)}}
    defined module HBaseRecordAirlineTest
    
  • Define the catalog: Catalog keeps mapping between Spark data and HBase table.

    
    val cat = s"""{
         |             |"table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
         |             |"rowkey":"key",
         |             |"columns":{
         |               |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |               |"Year":{"cf":"Year", "col":"Year", "type":"int"},
         |               |"Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
         |               |"Month":{"cf":"Month", "col":"Month", "type":"int"},
         |               |"DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
         |               |"DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
         |               |"FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
         |               |"UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
         |               |"AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
         |             |}
         |           |}""".stripMargin
    cat: String = 
    {
    "table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
    "rowkey":"key",
    "columns":{
    "col0":{"cf":"rowkey", "col":"key", "type":"string"},
    "Year":{"cf":"Year", "col":"Year", "type":"int"},
    "Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
    "Month":{"cf":"Month", "col":"Month", "type":"int"},
    "DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
    "DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
    "FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
    "UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
    "AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
    }
    }
    
  • Write Data: Given a data frame with specified schema, this will create a HBase table with 5 regions and save the data frame inside. Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.

    
    val data = (0 to 8).map { i =>HBaseRecordAirlineTest(i)}
    sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  • SQL Support

    
    def withCatalog(cat: String): DataFrame = {spark.sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog->cat)).format("org.apache.spark.sql.execution.datasources.hbase").load()}
    
    val df = withCatalog(cat)
    
    df.registerTempTable("table1")
    
    val c = spark.sqlContext.sql("select AirlineID from table1")
    
    c.show()
    +---------+
    |AirlineID|
    +---------+
    |   row000|
    |   row001|
    |   row002|
    |   row003|
    |   row004|
    |   row005|
    |   row006|
    |   row007|
    |   row008|
    +---------+
    

Please refer Spark-Hbase Connector for more information on the connector.