Чтение и запись данных Apache HBase с помощью Apache Spark

Обычно для запроса Apache HBase применяется низкоуровневый API (сканирует, получает и помещает) или синтаксис SQL, использующий Apache Phoenix. Apache также предоставляет соединитель Apache Spark HBase. Он является удобной и эффективной альтернативой запросу и изменению данных, хранящихся в HBase.

Необходимые компоненты

  • Два отдельных кластера HDInsight развернуты в одной виртуальной сети: HBase и Spark (версия не ниже 2.1) (HDInsight 3.6). Дополнительные сведения см. в статье Создание кластеров под управлением Linux в HDInsight с помощью портала Azure.

  • Схема универсального кода ресурса (URI) для основного хранилища кластеров. Для службы хранилища BLOB-объектов этой схемой будет wasb://, для Azure Data Lake Storage 2-го поколения — abfs://, для Azure Data Lake Storage 1-го поколения — adl://. Если для службы хранилища BLOB-объектов включено безопасное перемещение, URI будет таким: wasbs://. См. также сведения о безопасной передаче.

Общий процесс

Ниже приведен общий процесс, чтобы позволить кластеру Spark запрашивать кластер HBase.

  1. Подготовка демонстрационных данных в HBase.
  2. Извлеките файл hbase-site.xml из папки конфигурации кластера HBase (/etc/hbase/conf) и создайте копию файла hbase-site.xml в папке конфигурации Spark 2 (/etc/spark2/conf). (Необязательно: используйте для автоматизации этого процесса скрипт от команды HDInsight.)
  3. Запуск spark-shell со ссылкой на соединитель Spark HBase по его координатам Maven в параметре packages.
  4. Определение каталога, который сопоставляет схему из Spark с HBase.
  5. Взаимодействие с данными HBase с помощью API RDD или таблицы данных.

Подготовка демонстрационных данных в Apache HBase

На этом этапе вы создаете и заполняете таблицу в Apache HBase, которую затем можно запросить с помощью Spark.

  1. С помощью команды ssh подключитесь к кластеру HBase. Измените команду, заменив HBASECLUSTER имя кластера HBase, а затем введите команду:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. С помощью команды hbase shell запустите интерактивную оболочку HBase. В строку SSH-подключения введите следующую команду:

    hbase shell
    
  3. С помощью команды create создайте таблицу HBase с двумя семействами столбцов. Введите следующую команду:

    create 'Contacts', 'Personal', 'Office'
    
  4. С помощью команды put вставьте значения в указанный столбец строки в определенной таблице. Введите следующую команду:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. С помощью команды exit остановите работу интерактивной оболочки HBase. Введите следующую команду:

    exit
    

Запуск сценариев для настройки подключения между кластерами

Чтобы настроить связь между кластерами, выполните действия, чтобы запустить два скрипта в кластерах. Эти скрипты автоматизируют процесс копирования файлов, описанных в разделе "Настройка обмена данными вручную".

  • Скрипт, запускаемый из кластера HBase, отправит файл hbase-site.xml и сведения о сопоставлении IP-адресов HBase в хранилище по умолчанию, подключенное к кластеру Spark.
  • Скрипт, выполняемый из кластера Spark, настраивает два задания cron для периодического выполнения двух вспомогательных скриптов:
    1. задание cron HBase — скачивание новых файлов hbase-site.xml и сопоставления IP-адресов HBase из учетной записи хранения Spark по умолчанию на локальный узел;
    2. задание cron Spark — проверка того, имело ли место масштабирование Spark и является ли кластер безопасным. Если это так, отредактируйте файл /etc/hosts, включив сохраненное локально IP-сопоставление HBase.

ПРИМЕЧАНИЕ. Прежде чем продолжить, убедитесь, что вы добавили учетную запись хранения кластера Spark в кластер HBase в качестве дополнительной учетной записи хранения. Убедитесь, что скрипты указаны в порядке.

  1. С помощью действия скрипта на кластере HBase примените изменения, приняв во внимание указанные ниже соображения.

    Свойство Значение
    URI bash-скрипта https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Типы узлов Область/регион
    Параметры -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Сохранено yes
    • SECONDARYS_STORAGE_URL — URL-адрес хранилища по умолчанию на стороне Spark. Пример параметра: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. С помощью действия скрипта на кластере Spark примените изменения, приняв во внимание указанные ниже соображения.

    Свойство Значение
    URI bash-скрипта https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Типы узлов Головной, рабочий или ZooKeeper
    Параметры -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Сохранено yes
    • Можно указать, как часто этот кластер должен автоматически проверять наличие обновлений. Вариант по умолчанию: -s “*/1 * * * *” -h 0 (в этом примере задание cron Spark выполняется каждую минуту, а задание cron HBase не выполняется).
    • Так как cron HBase не настроен по умолчанию, необходимо повторно запустить этот скрипт при выполнении масштабирования в кластере HBase. Если кластер HBase часто масштабируется, вы можете настроить автоматический запуск задания cron HBase. Например: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" настраивает скрипт для выполнения проверок каждые 30 минут. Это приведет к периодическому запуску задания cron HBase по расписанию для автоматической загрузки новых сведений HBase в учетную запись хранения на локальный узел.

Примечание.

Эти скрипты работают только в кластерах HDI 5.0 и HDI 5.1.

Настройка связи вручную (необязательно — на тот случай, если скрипт на предыдущем шаге завершается ошибкой)

ПРИМЕЧАНИЕ. Эти действия следует выполнять каждый раз, когда на одном из кластеров происходит масштабирование.

  1. Скопируйте файл hbase-site.xml из локального хранилища в корень хранилища по умолчанию кластера Spark. Измените команду, чтобы отразить конфигурацию. Затем в открытом подключении SSH к кластеру HBase введите следующую команду:

    Значение синтаксиса Новое значение
    Схема URI Измените в соответствии со своим хранилищем. Синтаксис предназначен для хранилища BLOB-объектов с включенной безопасной передачей.
    SPARK_STORAGE_CONTAINER Замените соответствующее значение именем контейнера хранилища по умолчанию, используемым для вашего кластера Spark.
    SPARK_STORAGE_ACCOUNT Замените соответствующее значение именем учетной записи хранения по умолчанию, используемой для вашего кластера Spark.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Затем закройте подключение SSH к кластеру HBase.

    exit
    
  3. Подключитесь к головному узлу кластера Spark с помощью SSH. Измените команду, заменив SPARKCLUSTER имя кластера Spark, а затем введите команду:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Введите команду, чтобы скопировать hbase-site.xml из хранилища кластера Spark по умолчанию в папку конфигурации Spark 2 в локальном хранилище кластера:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Запуск оболочки Shell со ссылкой на соединитель Spark HBase

После выполнения предыдущего шага вы сможете запустить оболочку Spark, указав соответствующую версию соединителя Spark HBase.

Например, в следующей таблице указаны две версии и соответствующие команды, которые используют специалисты HDInsight. Вы можете использовать для своих кластеров те же версии, если ваши версии HBase и Spark совпадают с указанными в таблице.

  1. В подключении SSH к кластеру Spark введите следующую команду, чтобы запустить оболочку Spark:

    Версия Spark Версия HDI HBase Версия SHC Команда
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Не закрывайте этот экземпляр оболочки Spark и перейдите в раздел Определение каталога и отправка запроса. Дальше в разделе описано, что делать, если вы не нашли JAR-файлы для своих версий в репозитории SHC Core.

Эти артефакты больше не публикуются в указанном выше репозитории для последующих сочетаний версий Spark и HBase. Вы можете создать необходимые вам JAR-файлы непосредственно из ветви GitHub spark-hbase-connector. Например, если вы работаете с Spark 2.4 и HBase 2.1, выполните следующие действия:

  1. Клонируйте репозиторий:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Перейдите к ветви branch-2.4.

    git checkout branch-2.4
    
  3. Выполните сборку из ветви (будет создан JAR-файл):

    mvn clean package -DskipTests
    
  4. Выполните следующую команду (обязательно измените имя JAR-файла на имя созданного вами файла):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Не закрывайте этот экземпляр оболочки Spark и переходите к следующему разделу.

Определение каталога и отправка запроса

На этом этапе вы определяете объект каталога, который сопоставляет схему из Spark с Apache HBase.

  1. В открытой оболочке Spark выполните следующие инструкции import:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Введите следующую команду, чтобы задать каталог для таблицы Contacts, созданной в HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Код.

    1. определяет схему каталога для таблицы HBase под названием Contacts;
    2. определяет rowkey как key и сопоставляет имена столбцов, используемые в Spark, с семейством столбцов, именем столбца и типом столбца, используемыми в HBase;
    3. подробно определяет rowkey как именованный столбец (rowkey), который содержит определенное семейство столбцов cf из rowkey.
  3. Введите команду, чтобы определить метод, который предоставляет кадр данных вокруг Contacts таблицы в HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Создайте экземпляр таблицы данных:

    val df = withCatalog(catalog)
    
  5. Выполните запрос таблицы данных:

    df.show()
    

    Вы должны увидеть две строки данных:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Зарегистрируйте временную таблицу, чтобы запрашивать таблицу HBase с помощью Spark SQL:

    df.createTempView("contacts")
    
  7. Выполните SQL-запрос к таблице contacts:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Вы должны увидеть примерно такой результат:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Вставка новых данных

  1. Чтобы вставить новую запись о контакте, определите класс ContactRecord:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Создайте экземпляр ContactRecord и поместите его в массив:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Сохраните массив новых данных в HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Изучите результаты:

    df.show()
    

    Вы должны увидеть примерно такой результат:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Закройте оболочку Spark с помощью следующей команды:

    :q
    

Следующие шаги