您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 Apache Spark 读取和写入 Apache HBase 数据Use Apache Spark to read and write Apache HBase data

通常使用 Apache HBase 的低级别 API(扫描、获取和放置)或者通过 Apache Phoenix 使用 SQL 语法来查询 Apache HBase。Apache HBase is typically queried either with its low-level API (scans, gets, and puts) or with a SQL syntax using Apache Phoenix. Apache 还提供 Apache Spark HBase 连接器,这是一个查询并修改 HBase 存储的数据的方便且高效的替代方案。Apache also provides the Apache Spark HBase Connector, which is a convenient and performant alternative to query and modify data stored by HBase.

先决条件Prerequisites

整体进程Overall process

启用 Spark 群集查询 HDInsight 群集的概述过程如下所示:The high-level process for enabling your Spark cluster to query your HDInsight cluster is as follows:

  1. 在 HBase 中准备一些示例数据。Prepare some sample data in HBase.
  2. 从 HBase 集群配置文件夹 (/etc/hbase/conf) 中获取 hbase-site.xml 文件。Acquire the hbase-site.xml file from your HBase cluster configuration folder (/etc/hbase/conf).
  3. 将 hbase-site.xml 的副本放在 Spark 2 配置文件夹 (/etc/spark2/conf)。Place a copy of hbase-site.xml in your Spark 2 configuration folder (/etc/spark2/conf).
  4. 运行 spark-shell,在 packages 中按 Maven 坐标来引用 Spark HBase 连接器。Run spark-shell referencing the Spark HBase Connector by its Maven coordinates in the packages option.
  5. 定义将架构从 Spark 映射到 HBase 的目录。Define a catalog that maps the schema from Spark to HBase.
  6. 使用 RDD 或 DataFrame API 与 HBase 数据进行交互。Interact with the HBase data using either the RDD or DataFrame APIs.

在 Apache HBase 中准备示例数据Prepare sample data in Apache HBase

此步骤中,将在 Apache HBase 中创建并填充一个表,然后可使用 Spark 对其进行查询。In this step, you create and populate a table in Apache HBase that you can then query using Spark.

  1. 使用 ssh 命令连接到 HBase 群集。Use the ssh command to connect to your HBase cluster. 编辑以下命令,将 HBASECLUSTER 替换为 HBase 群集的名称,然后输入该命令:Edit the command below by replacing HBASECLUSTER with the name of your HBase cluster, and then enter the command:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. 使用 hbase shell 命令启动 HBase 交互式 shell。Use the hbase shell command to start the HBase interactive shell. 在 SSH 连接中输入以下命令。Enter the following command in your SSH connection:

    hbase shell
    
  3. 使用 create 命令创建包含双列系列的 HBase 表。Use the create command to create an HBase table with two-column families. 输入以下命令:Enter the following command:

    create 'Contacts', 'Personal', 'Office'
    
  4. 使用 put 命令将指定列中的值插入特定表中的指定行。Use the put command to insert values at a specified column in a specified row in a particular table. 输入以下命令:Enter the following command:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. 使用 exit 命令停止 HBase 交互式 shell。Use the exit command to stop the HBase interactive shell. 输入以下命令:Enter the following command:

    exit
    

将 hbase-site.xml 复制到 Spark 群集Copy hbase-site.xml to Spark cluster

将 hbase-site.xml 从本地存储复制到 Spark 群集默认存储所在的根目录。Copy the hbase-site.xml from local storage to the root of your Spark cluster's default storage. 编辑以下命令以反映配置。Edit the command below to reflect your configuration. 然后,在与 HBase 群集建立的 SSH 会话中输入该命令:Then, from your open SSH session to the HBase cluster, enter the command:

语法值Syntax value 新值New value
URI 方案URI scheme 修改此值以反映存储。Modify to reflect your storage. 以下语法适用于启用了安全传输的 Blob 存储。The syntax below is for blob storage with secure transfer enabled.
SPARK_STORAGE_CONTAINER 替换为 Spark 群集使用的默认存储容器名称。Replace with the default storage container name used for the Spark cluster.
SPARK_STORAGE_ACCOUNT 替换为 Spark 群集使用的默认存储帐户名称。Replace with the default storage account name used for the Spark cluster.
hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/

然后退出与 HBase 群集的 ssh 连接。Then exit your ssh connection to your HBase cluster.

将 hbase-site.xml 放置于 Spark 集群上Put hbase-site.xml on your Spark cluster

  1. 使用 SSH 连接到 Spark 集群的头节点。Connect to the head node of your Spark cluster using SSH.

  2. 输入以下命令,将 hbase-site.xml 从 Spark 群集的默认存储复制到群集本地存储上的 Spark 2 配置文件夹中:Enter the command below to copy hbase-site.xml from your Spark cluster's default storage to the Spark 2 configuration folder on the cluster's local storage:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

运行 Spark Shell,引用 Spark HBase 连接器Run Spark Shell referencing the Spark HBase Connector

  1. 在与 Spark 群集建立的 SSH 会话中,输入以下命令以启动 Spark shell:From your open SSH session to the Spark cluster, enter the command below to start a spark shell:

    spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
    
  2. 保持此 Spark Shell 实例处于打开状态,并继续执行下一步操作。Keep this Spark Shell instance open and continue to the next step.

定义目录和查询Define a Catalog and Query

在此步骤中,定义一个将架构从 Apache Spark 映射到 Apache HBase 的目录对象。In this step, you define a catalog object that maps the schema from Apache Spark to Apache HBase.

  1. 在打开的 Spark Shell 中,输入以下 import 语句:In your open Spark Shell, enter the following import statements:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. 输入以下命令,以定义在 HBase 中创建的 Contacts 表的目录:Enter the command below to define a catalog for the Contacts table you created in HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    代码将执行以下操作:The code does the following:

    a.a. 定义名为 Contacts 的 HBase 表的目录架构。Define a catalog schema for the HBase table named Contacts.
    b.b. 将 rowkey 标识为 key,并将 Spark 中使用的列名映射到 HBase 中使用的列族、列名和列类型。Identify the rowkey as key, and map the column names used in Spark to the column family, column name, and column type as used in HBase.
    c.c. Rowkey 还必须详细定义为具有 rowkey 的特定列族 cf 的命名列 (rowkey)。The rowkey also has to be defined in detail as a named column (rowkey), which has a specific column family cf of rowkey.

  3. 输入以下命令,以定义一个在 HBase 中提供围绕 Contacts 表的 DataFrame 的方法:Enter the command below to define a method that provides a DataFrame around your Contacts table in HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. 创建 DataFrame 的实例:Create an instance of the DataFrame:

    val df = withCatalog(catalog)
    
  5. 查询 DataFrame:Query the DataFrame:

    df.show()
    
  6. 应看到如下两行数据:You should see two rows of data:

     +------+--------------------+--------------+-------------+--------------+
     |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
     +------+--------------------+--------------+-------------+--------------+
     |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
     |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
     +------+--------------------+--------------+-------------+--------------+
    
  7. 注册一个临时表,以便使用 Spark SQL 查询 HBase 表:Register a temporary table so you can query the HBase table using Spark SQL:

    df.createTempView("contacts")
    
  8. 针对 contacts 表发出 SQL 查询:Issue a SQL query against the contacts table:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    
  9. 应看到如下结果:You should see results like these:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

插入新数据Insert new data

  1. 若要插入新的 Contact 记录,请定义 ContactRecord 类:To insert a new Contact record, define a ContactRecord class:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. 创建 ContactRecord 的实例并将其放在一个数组中:Create an instance of ContactRecord and put it in an array:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. 将新数据数组保存到 HBase:Save the array of new data to HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. 检查结果:Examine the results:

    df.show()
    
  5. 应看到如下输出:You should see output like this:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  6. 通过输入以下命令关闭 spark shell:Close the spark shell by entering the following command:

    :q
    

后续步骤Next steps