您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 Apache Spark 读取和写入 Apache HBase 数据Use Apache Spark to read and write Apache HBase data

通常使用 Apache HBase 的低级别 API(扫描、获取和放置)或者通过 Apache Phoenix 使用 SQL 语法来查询 Apache HBase。Apache HBase is typically queried either with its low-level API (scans, gets, and puts) or with a SQL syntax using Apache Phoenix. Apache 还提供 Apache Spark HBase 连接器。Apache also provides the Apache Spark HBase Connector. 连接器是用于查询和修改 HBase 存储的数据的一种便捷且高效的替代方法。The Connector is a convenient and efficient alternative to query and modify data stored by HBase.

先决条件Prerequisites

  • 部署在同一虚拟网络中的两个单独的 HDInsight 群集。Two separate HDInsight clusters deployed in the same virtual network. 一个HBase 和一个至少安装了 Spark 2.1 (HDInsight 3.6) 的 Spark。One HBase, and one Spark with at least Spark 2.1 (HDInsight 3.6) installed. 有关详细信息,请参阅使用 Azure 门户在 HDInsight 中创建基于 Linux 的群集For more information, see Create Linux-based clusters in HDInsight using the Azure portal.

  • 群集主存储的 URI 方案。The URI scheme for your clusters primary storage. 此方案将 wasb://用于 Azure Blob 存储, abfs:// 适用于 Azure Data Lake Storage Gen1 的 Azure Data Lake Storage Gen2 或 adl://。This scheme would be wasb:// for Azure Blob Storage, abfs:// for Azure Data Lake Storage Gen2 or adl:// for Azure Data Lake Storage Gen1. 如果为 Blob 存储启用了安全传输,则 URI 将为 wasbs://If secure transfer is enabled for Blob Storage, the URI would be wasbs://. 另请参阅安全传输See also, secure transfer.

整体进程Overall process

启用 Spark 群集以查询 HBase 群集的高级过程如下所示:The high-level process for enabling your Spark cluster to query your HBase cluster is as follows:

  1. 在 HBase 中准备一些示例数据。Prepare some sample data in HBase.
  2. 从 HBase 群集配置文件夹 (/etc/hbase/conf) 获取 hbase-site.xml 文件,并在 Spark 2 配置文件夹中将 hbase-site.xml 的副本放在/etc/spark2/conf () 。Acquire the hbase-site.xml file from your HBase cluster configuration folder (/etc/hbase/conf), and place a copy of hbase-site.xml in your Spark 2 configuration folder (/etc/spark2/conf). (可选:使用 HDInsight 团队提供的脚本自动执行此过程) (OPTIONAL: use script provided by HDInsight team to automate this process)
  3. 运行 spark-shell,在 packages 中按 Maven 坐标来引用 Spark HBase 连接器。Run spark-shell referencing the Spark HBase Connector by its Maven coordinates in the packages option.
  4. 定义将架构从 Spark 映射到 HBase 的目录。Define a catalog that maps the schema from Spark to HBase.
  5. 使用 RDD 或 DataFrame API 与 HBase 数据进行交互。Interact with the HBase data using either the RDD or DataFrame APIs.

在 Apache HBase 中准备示例数据Prepare sample data in Apache HBase

此步骤中,将在 Apache HBase 中创建并填充一个表,然后可使用 Spark 对其进行查询。In this step, you create and populate a table in Apache HBase that you can then query using Spark.

  1. 使用 ssh 命令连接到 HBase 群集。Use the ssh command to connect to your HBase cluster. 编辑以下命令,将 HBASECLUSTER 替换为 HBase 群集的名称,然后输入该命令:Edit the command below by replacing HBASECLUSTER with the name of your HBase cluster, and then enter the command:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. 使用 hbase shell 命令启动 HBase 交互式 shell。Use the hbase shell command to start the HBase interactive shell. 在 SSH 连接中输入以下命令。Enter the following command in your SSH connection:

    hbase shell
    
  3. 使用 create 命令创建包含双列系列的 HBase 表。Use the create command to create an HBase table with two-column families. 输入以下命令:Enter the following command:

    create 'Contacts', 'Personal', 'Office'
    
  4. 使用 put 命令将指定列中的值插入特定表中的指定行。Use the put command to insert values at a specified column in a specified row in a particular table. 输入以下命令:Enter the following command:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. 使用 exit 命令停止 HBase 交互式 shell。Use the exit command to stop the HBase interactive shell. 输入以下命令:Enter the following command:

    exit
    

运行脚本以设置群集之间的连接Run scripts to set up connection between clusters

若要设置群集之间的通信,请执行以下步骤,在群集上运行两个脚本。To set up the communication between clusters, follow the below steps to run two scripts on your clusters. 这些脚本将自动执行下面的 "手动设置通信" 一节中所述的文件复制过程。These scripts will automate the process of file copying described in 'Set up communication manually' section below.

  • 从 HBase 群集运行的脚本将 hbase-site.xml 向附加到 Spark 群集的默认存储上传和 HBASE IP 映射信息。The script you run from the HBase cluster will upload hbase-site.xml and HBase IP-mapping information to the default storage attached to your Spark cluster.
  • 从 Spark 群集运行的脚本将设置两个 cron 作业,以便定期运行两个帮助器脚本:The script that you run from the Spark cluster sets up two cron jobs to run two helper scripts periodically:
    1. HBase cron 作业– hbase-site.xml 从 Spark 默认存储帐户向本地节点下载新文件和 HBASE IP 映射HBase cron job – download new hbase-site.xml files and HBase IP mapping from Spark default storage account to local node
    2. Spark cron 作业–检查是否发生了 Spark 缩放以及群集是否安全。Spark cron job – checks if a Spark scaling occurred and if cluster is secure. 如果是这样,则编辑 /etc/hosts 以包含本地存储的 HBASE IP 映射If so, edit /etc/hosts to include HBase IP mapping stored locally

注意:在继续操作之前,请确保已将 Spark 群集的存储帐户作为辅助存储帐户添加到 HBase 群集。NOTE: Before proceeding, make sure you have added the Spark cluster’s storage account to your HBase cluster as secondary storage account. 请确保脚本按下面所示的顺序排列。Make sure you the scripts in order as indicated below.

  1. 在 HBase 群集上使用脚本操作,应用更改,但需注意以下事项:Use Script Action on your HBase cluster to apply the changes with the following considerations:

    属性Property ValueValue
    Bash 脚本 URIBash script URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh
    节点类型Node type(s) 区域Region
    参数Parameters -s SECONDARYS_STORAGE_URL
    持久化Persisted yes
    • SECONDARYS_STORAGE_URL是 Spark 端默认存储的 url。SECONDARYS_STORAGE_URL is the url of the Spark side default storage. 参数示例:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.netParameter Example: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
  2. 在 Spark 群集上使用脚本操作,以应用更改,但需注意以下事项:Use Script Action on your Spark cluster to apply the changes with the following considerations:

    属性Property ValueValue
    Bash 脚本 URIBash script URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh
    节点类型Node type(s) Head,Worker,ZookeeperHead, Worker, Zookeeper
    参数Parameters -s "SPARK-CRON-SCHEDULE"可选) (可选 -h "HBASE-CRON-SCHEDULE" () -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional)
    持久化Persisted yes
    • 可以指定希望此分类自动检查更新的频率。You can specify how often you want this cluster to automatically check if update. 默认值:-s "*/1 * * * * *"-h 0 (在此示例中,Spark cron 每分钟运行一次,而 HBase cron 不会运行) Default: -s “*/1 * * * *” -h 0 (In this example, the Spark cron runs every minute, while the HBase cron doesn't run)
    • 由于默认情况下不会设置 HBase cron,因此在执行缩放到 HBase 群集时,需要重新运行此脚本。Since HBase cron is not set up by default, you need to rerun this script when perform scaling to your HBase cluster. 如果 HBase 群集频繁缩放,则可以选择自动设置 HBase cron 作业。If your HBase cluster scales often, you may choose to set up HBase cron job automatically. 例如: -h "*/30 * * * *" 将脚本配置为每30分钟执行一次检查。For example: -h "*/30 * * * *" configures the script to perform checks every 30 minutes. 这会定期运行 HBase cron 计划,以自动将公共存储帐户上的新 HBase 信息下载到本地节点。This will run HBase cron schedule periodically to automate downloading of new HBase information on the common storage account to local node.

如果以上步骤中提供的脚本失败,请 (可选,手动设置通信) Set up communication manually (Optional, if provided script in above step fails)

注意: 每次群集经历缩放活动时,都需要执行这些步骤。NOTE: These steps need to perform every time one of the clusters undergoes a scaling activity.

  1. 将 hbase-site.xml 从本地存储复制到 Spark 群集默认存储所在的根目录。Copy the hbase-site.xml from local storage to the root of your Spark cluster's default storage. 编辑以下命令以反映配置。Edit the command below to reflect your configuration. 然后,在与 HBase 群集建立的 SSH 会话中输入该命令:Then, from your open SSH session to the HBase cluster, enter the command:

    语法值Syntax value 新值New value
    URI 方案URI scheme 修改此值以反映存储。Modify to reflect your storage. 以下语法适用于启用了安全传输的 Blob 存储。The syntax below is for blob storage with secure transfer enabled.
    SPARK_STORAGE_CONTAINER 替换为 Spark 群集使用的默认存储容器名称。Replace with the default storage container name used for the Spark cluster.
    SPARK_STORAGE_ACCOUNT 替换为 Spark 群集使用的默认存储帐户名称。Replace with the default storage account name used for the Spark cluster.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. 然后退出与 HBase 群集的 ssh 连接。Then exit your ssh connection to your HBase cluster.

    exit
    
  3. 使用 SSH 连接到 Spark 集群的头节点。Connect to the head node of your Spark cluster using SSH. 编辑以下命令,将 SPARKCLUSTER 替换为 Spark 群集的名称,然后输入该命令:Edit the command below by replacing SPARKCLUSTER with the name of your Spark cluster, and then enter the command:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. 输入以下命令,将 hbase-site.xml 从 Spark 群集的默认存储复制到群集本地存储上的 Spark 2 配置文件夹中:Enter the command below to copy hbase-site.xml from your Spark cluster's default storage to the Spark 2 configuration folder on the cluster's local storage:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

运行 Spark Shell,引用 Spark HBase 连接器Run Spark Shell referencing the Spark HBase Connector

完成上述步骤后,应能够运行 Spark shell,并引用相应版本的 Spark HBase 连接器。After you complete the preceding step, you should be able to run Spark shell, referencing the appropriate version of Spark HBase Connector. 若要为群集方案查找最新的适当 Spark HBase 连接器核心版本,请参阅SHC Core RepositoryTo find the most recent appropriate Spark HBase Connector core version for your cluster scenario, see SHC Core Repository.

例如,下表列出了 HDInsight 团队当前使用的两个版本和相应的命令。As an example, the following table lists two versions and the corresponding commands the HDInsight team currently uses. 如果 HBase 和 Spark 的版本与表中所示的相同,则可为群集使用相同的版本。You can use the same versions for your clusters if the versions of HBase and Spark are same as indicated in the table.

  1. 在打开的到 Spark 群集的 SSH 会话中,输入以下命令以启动 Spark shell:In your open SSH session to the Spark cluster, enter the following command to start a Spark shell:

    Spark 版本Spark version HDI HBase 版本HDI HBase version SHC 版本SHC version CommandCommand
    2.12.1 HDI 3.6 (HBase 1.1) HDI 3.6 (HBase 1.1) 1.1.0.3.1.2.2-11.1.0.3.1.2.2-1 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
    2.42.4 HDI 4.0 (HBase 2.0) HDI 4.0 (HBase 2.0) 1.1.1-2.1-s_21.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks.shc:shc-core:1.1.0.3.1.2.2-1 --repositories http://repo.hortonworks.com/content/groups/public/
  2. 使此 Spark shell 实例保持打开状态并继续定义目录和查询Keep this Spark shell instance open and continue to Define a catalog and query. 如果找不到对应于 SHC Core 存储库中的版本的 jar,请继续阅读。If you don't find the jars that correspond to your versions in the SHC Core repository, continue reading.

可以直接从spark--连接器GitHub 分支生成 jar。You can build the jars directly from the spark-hbase-connector GitHub branch. 例如,如果运行的是 Spark 2.3 和 HBase 1.1,请完成以下步骤:For example, if you are running with Spark 2.3 and HBase 1.1, complete these steps:

  1. 克隆存储库:Clone the repo:

    git clone https://github.com/hortonworks-spark/shc
    
  2. 中转到分支-2.3:Go to branch-2.3:

    git checkout branch-2.3
    
  3. 从分支生成 (创建 .jar 文件) :Build from the branch (creates a .jar file):

    mvn clean package -DskipTests
    
  4. 运行以下命令 (确保更改对应于所生成的 .jar 文件的 .jar 名称) :Run the following command (be sure to change the .jar name that corresponds to the .jar file you built):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar
    
  5. 使此 Spark shell 实例保持打开状态,然后继续下一部分。Keep this Spark shell instance open and continue to the next section.

定义目录和查询Define a catalog and query

在此步骤中,定义一个将架构从 Apache Spark 映射到 Apache HBase 的目录对象。In this step, you define a catalog object that maps the schema from Apache Spark to Apache HBase.

  1. 在打开的 Spark Shell 中,输入以下 import 语句:In your open Spark Shell, enter the following import statements:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. 输入以下命令,以定义在 HBase 中创建的 Contacts 表的目录:Enter the command below to define a catalog for the Contacts table you created in HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    代码:The code:

    1. 定义名为的 HBase 表的目录架构 ContactsDefines a catalog schema for the HBase table named Contacts.
    2. 将 rowkey 标识为 key ,并将 Spark 中使用的列名映射到在 HBase 中使用的列系列、列名称和列类型。Identifies the rowkey as key, and map the column names used in Spark to the column family, column name, and column type as used in HBase.
    3. 将 rowkey 定义为 () 的命名列 rowkey ,该列具有特定的列系列 cf rowkeyDefines the rowkey in detail as a named column (rowkey), which has a specific column family cf of rowkey.
  3. 输入以下命令,以定义一个在 HBase 中提供围绕 Contacts 表的 DataFrame 的方法:Enter the command below to define a method that provides a DataFrame around your Contacts table in HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. 创建 DataFrame 的实例:Create an instance of the DataFrame:

    val df = withCatalog(catalog)
    
  5. 查询 DataFrame:Query the DataFrame:

    df.show()
    

    应看到如下两行数据:You should see two rows of data:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. 注册一个临时表,以便使用 Spark SQL 查询 HBase 表:Register a temporary table so you can query the HBase table using Spark SQL:

    df.createTempView("contacts")
    
  7. 针对 contacts 表发出 SQL 查询:Issue a SQL query against the contacts table:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    应看到如下结果:You should see results like these:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

插入新数据Insert new data

  1. 若要插入新的 Contact 记录,请定义 ContactRecord 类:To insert a new Contact record, define a ContactRecord class:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. 创建 ContactRecord 的实例并将其放在一个数组中:Create an instance of ContactRecord and put it in an array:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. 将新数据数组保存到 HBase:Save the array of new data to HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. 检查结果:Examine the results:

    df.show()
    

    应看到如下输出:You should see output like this:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. 通过输入以下命令关闭 spark shell:Close the spark shell by entering the following command:

    :q
    

后续步骤Next steps