Utilizar o Apache Spark para ler e escrever dados do Apache HBase

O Apache HBase normalmente é consultado com sua API de baixo nível (scans, gets, e puts) ou com uma sintaxe SQL usando o Apache Phoenix. O Apache também fornece o Apache Spark HBase Connector. O conector é uma alternativa conveniente e eficiente para consultar e modificar dados armazenados pelo HBase.

Pré-requisitos

  • Dois clusters HDInsight separados implantados na mesma rede virtual. Um HBase e um Spark com pelo menos o Spark 2.1 (HDInsight 3.6) instalado. Para obter mais informações, consulte Criar clusters baseados em Linux no HDInsight usando o portal do Azure.

  • O esquema de URI para o armazenamento primário de clusters. Esse esquema seria wasb:// para o Armazenamento de Blobs do Azure, abfs:// para o Azure Data Lake Storage Gen2 ou adl:// para o Azure Data Lake Storage Gen1. Se a transferência segura estiver habilitada para o Armazenamento de Blob, o URI será wasbs://. Consulte também, transferência segura.

Processo geral

O processo de alto nível para permitir que o cluster do Spark consulte o cluster HBase é o seguinte:

  1. Prepare alguns dados de amostra no HBase.
  2. Adquira o arquivo hbase-site.xml da pasta de configuração do cluster HBase (/etc/hbase/conf) e coloque uma cópia do hbase-site.xml na pasta de configuração do Spark 2 (/etc/spark2/conf). (OPCIONAL: use o script fornecido pela equipe do HDInsight para automatizar esse processo)
  3. Execute spark-shell a referência ao Spark HBase Connector por suas coordenadas Maven na packages opção.
  4. Defina um catálogo que mapeie o esquema do Spark para o HBase.
  5. Interaja com os dados do HBase usando as APIs RDD ou DataFrame.

Preparar dados de exemplo no Apache HBase

Nesta etapa, você cria e preenche uma tabela no Apache HBase que pode ser consultada usando o Spark.

  1. Use o ssh comando para se conectar ao cluster HBase. Edite o comando substituindo HBASECLUSTER pelo nome do cluster HBase e digite o comando:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Use o hbase shell comando para iniciar o shell interativo do HBase. Digite o seguinte comando em sua conexão SSH:

    hbase shell
    
  3. Use o create comando para criar uma tabela HBase com famílias de duas colunas. Introduza o seguinte comando:

    create 'Contacts', 'Personal', 'Office'
    
  4. Use o put comando para inserir valores em uma coluna especificada em uma linha especificada em uma tabela específica. Introduza o seguinte comando:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Use o exit comando para parar o shell interativo do HBase. Introduza o seguinte comando:

    exit
    

Executar scripts para configurar a conexão entre clusters

Para configurar a comunicação entre clusters, siga as etapas para executar dois scripts em seus clusters. Esses scripts automatizarão o processo de cópia de arquivos descrito na seção 'Configurar a comunicação manualmente'.

  • O script executado a partir do cluster HBase carregará hbase-site.xml as informações de mapeamento IP do HBase para o armazenamento padrão anexado ao cluster do Spark.
  • O script executado a partir do cluster do Spark configura dois trabalhos cron para executar dois scripts auxiliares periodicamente:
    1. HBase cron job – faça download de novos hbase-site.xml arquivos e mapeamento de IP do HBase da conta de armazenamento padrão do Spark para o nó local
    2. Spark cron job – verifica se ocorreu um dimensionamento do Spark e se o cluster é seguro. Em caso afirmativo, edite /etc/hosts para incluir o mapeamento IP do HBase armazenado localmente

NOTA: Antes de continuar, certifique-se de que adicionou a conta de armazenamento do cluster Spark ao cluster HBase como conta de armazenamento secundário. Certifique-se de que os scripts estão em ordem, conforme indicado.

  1. Use a Ação de script no cluster do HBase para aplicar as alterações com as seguintes considerações:

    Property valor
    Bash script URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Tipo(s) de nó(s) País/Região
    Parâmetros -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Persistiu sim
    • SECONDARYS_STORAGE_URL é a url do armazenamento padrão do lado do Spark. Exemplo de parâmetro: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Use a Ação de Script no cluster do Spark para aplicar as alterações com as seguintes considerações:

    Property valor
    Bash script URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Tipo(s) de nó(s) Chefe, Trabalhador, Zookeeper
    Parâmetros -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Persistiu sim
    • Você pode especificar com que frequência deseja que esse cluster verifique automaticamente se está atualizado. Padrão: -s "*/1 * * * *" -h 0 (Neste exemplo, o cron do Spark é executado a cada minuto, enquanto o cron do HBase não é executado)
    • Como o cron do HBase não está configurado por padrão, você precisa executar esse script novamente ao executar o dimensionamento para o cluster do HBase. Se o cluster do HBase for dimensionado com frequência, você poderá optar por configurar o trabalho cron do HBase automaticamente. Por exemplo: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" configura o script para executar verificações a cada 30 minutos. Isso executará o cronograma cron do HBase periodicamente para automatizar o download de novas informações do HBase na conta de armazenamento comum para o nó local.

Nota

Esses scripts funcionam apenas em clusters HDI 5.0 e HDI 5.1.

Configurar a comunicação manualmente (Opcional, se o script fornecido na etapa acima falhar)

NOTA: Estas etapas precisam ser executadas sempre que um dos clusters passa por uma atividade de dimensionamento.

  1. Copie o hbase-site.xml do armazenamento local para a raiz do armazenamento padrão do cluster Spark. Edite o comando para refletir sua configuração. Em seguida, da sessão SSH aberta para o cluster HBase, digite o comando:

    Valor da sintaxe Novo valor
    Esquema de URI Modifique para refletir seu armazenamento. A sintaxe é para armazenamento de blob com transferência segura habilitada.
    SPARK_STORAGE_CONTAINER Substitua pelo nome do contêiner de armazenamento padrão usado para o cluster do Spark.
    SPARK_STORAGE_ACCOUNT Substitua pelo nome da conta de armazenamento padrão usado para o cluster do Spark.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Em seguida, saia da conexão ssh para o cluster HBase.

    exit
    
  3. Conecte-se ao nó principal do cluster do Spark usando SSH. Edite o comando substituindo SPARKCLUSTER pelo nome do cluster do Spark e digite o comando:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Insira o comando para copiar hbase-site.xml do armazenamento padrão do cluster Spark para a pasta de configuração do Spark 2 no armazenamento local do cluster:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Execute o Spark Shell fazendo referência ao Spark HBase Connector

Depois de concluir a etapa anterior, você poderá executar o shell do Spark, fazendo referência à versão apropriada do Spark HBase Connector.

Como exemplo, a tabela a seguir lista duas versões e os comandos correspondentes que a equipe do HDInsight usa atualmente. Você pode usar as mesmas versões para seus clusters se as versões do HBase e do Spark forem as mesmas indicadas na tabela.

  1. Na sessão SSH aberta para o cluster do Spark, digite o seguinte comando para iniciar um shell do Spark:

    Versão do Spark HDI HBase versão Versão SHC Comando
    2.1 IDH 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Mantenha essa instância do shell do Spark aberta e continue para Definir um catálogo e uma consulta. Se você não encontrar os jars que correspondem às suas versões no repositório SHC Core, continue lendo.

Para combinações subsequentes das versões Spark e HBase, esses artefatos não são mais publicados no repositório acima. Você pode construir os frascos diretamente da ramificação GitHub spark-hbase-connector . Por exemplo, se você estiver executando com o Spark 2.4 e o HBase 2.1, conclua estas etapas:

  1. Clone o repo:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Ir para ramo-2.4:

    git checkout branch-2.4
    
  3. Compilar a partir da ramificação (cria um arquivo .jar):

    mvn clean package -DskipTests
    
  4. Execute o seguinte comando (certifique-se de alterar o nome .jar que corresponde ao arquivo .jar que você criou):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Mantenha esta instância do shell do Spark aberta e continue para a próxima seção.

Definir um catálogo e uma consulta

Nesta etapa, você define um objeto de catálogo que mapeia o esquema do Apache Spark para o Apache HBase.

  1. No Spark Shell aberto, insira as seguintes import instruções:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Digite o comando abaixo para definir um catálogo para a tabela Contatos que você criou no HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    O código:

    1. Define um esquema de catálogo para a tabela do HBase chamada Contacts.
    2. Identifica a chave de linha como key, e mapeia os nomes de coluna usados no Spark para a família de colunas, o nome da coluna e o tipo de coluna usados no HBase.
    3. Define a chave de linha em detalhes como uma coluna nomeada (rowkey), que tem uma família cf de colunas específica de rowkey.
  3. Insira o comando para definir um método que forneça um DataFrame ao redor de sua Contacts tabela no HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Crie uma instância do DataFrame:

    val df = withCatalog(catalog)
    
  5. Consulte o DataFrame:

    df.show()
    

    Você verá duas linhas de dados:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Registre uma tabela temporária para que você possa consultar a tabela do HBase usando o Spark SQL:

    df.createTempView("contacts")
    
  7. Emita uma consulta SQL na contacts tabela:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Você deve ver resultados como estes:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Inserir novos dados

  1. Para inserir um novo registro de contato, defina uma ContactRecord classe:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Crie uma instância de ContactRecord e coloque-a em uma matriz:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Salve a matriz de novos dados no HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Examine os resultados:

    df.show()
    

    Deverá ver um resultado como este:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Feche o shell de faísca digitando o seguinte comando:

    :q
    

Próximos passos