Använda Apache Spark för att läsa och skriva Apache HBase-data

Apache HBase efterfrågas vanligtvis antingen med sitt lågnivå-API (genomsökningar, hämtar och placerar) eller med en SQL-syntax med Apache Phoenix. Apache tillhandahåller även Apache Spark HBase Connector. Anslutningsappen är ett praktiskt och effektivt alternativ för att fråga efter och ändra data som lagras av HBase.

Förutsättningar

Övergripande process

Den avancerade processen för att aktivera Spark-klustret för att fråga ditt HBase-kluster är följande:

  1. Förbered några exempeldata i HBase.
  2. Hämta hbase-site.xml-filen från konfigurationsmappen för HBase-klustret (/etc/hbase/conf) och placera en kopia av hbase-site.xml i spark 2-konfigurationsmappen (/etc/spark2/conf). (VALFRITT: Använd skript från HDInsight-teamet för att automatisera den här processen)
  3. Kör spark-shell referensen till Spark HBase Connector med dess Maven-koordinater i packages alternativet .
  4. Definiera en katalog som mappar schemat från Spark till HBase.
  5. Interagera med HBase-data med antingen RDD- eller DataFrame-API:er.

Förbereda exempeldata i Apache HBase

I det här steget skapar och fyller du i en tabell i Apache HBase som du sedan kan köra frågor mot med Spark.

  1. Använd kommandot ssh för att ansluta till ditt HBase-kluster. Redigera kommandot nedan genom att HBASECLUSTER ersätta med namnet på ditt HBase-kluster och ange sedan kommandot:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Använd kommandot hbase shell för att starta det interaktiva HBase-gränssnittet. Ange följande kommando i SSH-anslutningen:

    hbase shell
    
  3. Använd kommandot create för att skapa en HBase-tabell med två kolumnfamiljer. Ange följande kommando:

    create 'Contacts', 'Personal', 'Office'
    
  4. Använd kommandot put för att infoga värden i en angiven kolumn på en angiven rad i en viss tabell. Ange följande kommando:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Använd kommandot exit för att stoppa det interaktiva HBase-gränssnittet. Ange följande kommando:

    exit
    

Kör skript för att konfigurera anslutningen mellan kluster

Om du vill konfigurera kommunikationen mellan kluster följer du stegen nedan för att köra två skript i dina kluster. De här skripten automatiserar filkopieringsprocessen som beskrivs i avsnittet "Konfigurera kommunikation manuellt" nedan.

  • Skriptet som du kör från HBase-klustret laddar upp hbase-site.xml och HBase IP-mappningsinformation till standardlagringen som är kopplad till ditt Spark-kluster.
  • Skriptet som du kör från Spark-klustret uppsättningar två cron-jobb för att köra två hjälpskript regelbundet:
    1. HBase Cron-jobb – ladda ned nya hbase-site.xml filer och HBase IP-mappning från Spark-standardlagringskontot till den lokala noden
    2. Spark Cron-jobb – kontrollerar om en Spark-skalning har skett och om klustret är säkert. I så fall redigerar du för /etc/hosts att inkludera HBase IP-mappning som lagras lokalt

Obs! Kontrollera att du har lagt till Spark-klustrets lagringskonto i ditt HBase-kluster som sekundärt lagringskonto innan du fortsätter. Kontrollera att skripten är i rätt ordning enligt nedan.

  1. Använd skriptåtgärd på ditt HBase-kluster för att tillämpa ändringarna med följande överväganden:

    Egenskap Värde
    URI för Bash-skript https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh
    Nodtyper Region
    Parametrar -s SECONDARYS_STORAGE_URL
    Framhärdade ja
    • SECONDARYS_STORAGE_URL är URL:en för standardlagringen på Spark-sidan. Parameterexempel: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
  2. Använd skriptåtgärd på sparkklustret för att tillämpa ändringarna med följande överväganden:

    Egenskap Värde
    URI för Bash-skript https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh
    Nodtyper Head, Worker, Zookeeper
    Parametrar -s "SPARK-CRON-SCHEDULE" (valfritt) -h "HBASE-CRON-SCHEDULE" (valfritt)
    Framhärdade ja
    • Du kan ange hur ofta du vill att det här klustret automatiskt ska kontrollera om det uppdateras. Standard: -s "*/1 * * *" -h 0 (i det här exemplet körs Spark Cron varje minut, medan HBase Cron inte körs)
    • Eftersom HBase Cron inte har ställts in som standard måste du köra skriptet igen när du utför skalning till ditt HBase-kluster. Om ditt HBase-kluster skalar ofta kan du välja att konfigurera HBase Cron-jobb automatiskt. Exempel: -h "*/30 * * * *" konfigurerar skriptet för att utföra kontroller var 30:e minut. Då körs HBase Cron-schemat regelbundet för att automatisera nedladdningen av ny HBase-information på det gemensamma lagringskontot till den lokala noden.

Konfigurera kommunikation manuellt (valfritt, om det angivna skriptet i ovanstående steg misslyckas)

OBS! De här stegen måste utföras varje gång ett av klustren genomgår en skalningsaktivitet.

  1. Kopiera hbase-site.xml från lokal lagring till roten för Spark-klustrets standardlagring. Redigera kommandot nedan för att återspegla konfigurationen. Ange sedan följande kommando från den öppna SSH-sessionen till HBase-klustret:

    Syntaxvärde Nytt värde
    URI-schema Ändra för att återspegla din lagring. Syntaxen nedan är för bloblagring med säker överföring aktiverad.
    SPARK_STORAGE_CONTAINER Ersätt med standardnamnet för lagringscontainern som används för Spark-klustret.
    SPARK_STORAGE_ACCOUNT Ersätt med standardnamnet för lagringskontot som används för Spark-klustret.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Avsluta sedan SSH-anslutningen till ditt HBase-kluster.

    exit
    
  3. Anslut till spark-klustrets huvudnod med hjälp av SSH. Redigera kommandot nedan genom att ersätta SPARKCLUSTER med namnet på ditt Spark-kluster och ange sedan kommandot:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Ange kommandot nedan för att hbase-site.xml kopiera från Spark-klustrets standardlagring till Spark 2-konfigurationsmappen på klustrets lokala lagring:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Kör Spark Shell som refererar till Spark HBase-anslutningsappen

När du har slutfört föregående steg bör du kunna köra Spark-gränssnittet och referera till rätt version av Spark HBase Connector. Den senaste lämpliga spark HBase Connector-kärnversionen för ditt klusterscenario finns i SHC Core Repository.

I följande tabell visas till exempel två versioner och motsvarande kommandon som HDInsight-teamet använder för närvarande. Du kan använda samma versioner för dina kluster om versionerna av HBase och Spark är desamma som anges i tabellen.

  1. I din öppna SSH-session till Spark-klustret anger du följande kommando för att starta ett Spark-gränssnitt:

    Spark-version HDI HBase-version SHC-version Kommando
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Håll den här Spark Shell-instansen öppen och fortsätt till Definiera en katalog och fråga. Om du inte hittar jar-filarna som motsvarar dina versioner i SHC Core-lagringsplatsen fortsätter du att läsa.

För efterföljande kombinationer av Spark- och HBase-versioner publiceras dessa artefakter inte längre på lagringsplatsen ovan. Du kan skapa jar-koderna direkt från spark-hbase-connector-GitHub grenen. Om du till exempel kör med Spark 2.4 och HBase 2.1 utför du följande steg:

  1. Klona lagringsplatsen:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Gå till branch-2.4:

    git checkout branch-2.4
    
  3. Skapa från -grenen (skapar en .jar-fil):

    mvn clean package -DskipTests
    
  4. Kör följande kommando (se till att ändra .jar-namnet som motsvarar den .jar-fil som du skapade):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Håll den här Spark Shell-instansen öppen och fortsätt till nästa avsnitt.

Definiera en katalog och fråga

I det här steget definierar du ett katalogobjekt som mappar schemat från Apache Spark till Apache HBase.

  1. Ange följande instruktioner i ditt öppna import Spark-gränssnitt:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Ange kommandot nedan för att definiera en katalog för tabellen Kontakter som du skapade i HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Koden:

    1. Definierar ett katalogschema för HBase-tabellen med namnet Contacts .
    2. Identifierar radnyckeln som och mappar kolumnnamnen som används i Spark till kolumnfamiljen, kolumnnamnet och kolumntypen som key används i HBase.
    3. Definierar radnyckeln i detalj som en namngiven kolumn ( rowkey ), som har en specifik kolumnfamilj med cf rowkey .
  3. Ange kommandot nedan för att definiera en metod som tillhandahåller en DataFrame runt Contacts tabellen i HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Skapa en instans av DataFrame:

    val df = withCatalog(catalog)
    
  5. Fråga dataramen:

    df.show()
    

    Du bör se två rader med data:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Registrera en tillfällig tabell så att du kan köra frågor mot HBase-tabellen med spark-SQL:

    df.createTempView("contacts")
    
  7. Utfärda en SQL-fråga mot contacts tabellen:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Du bör se resultat som dessa:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Infoga nya data

  1. Om du vill infoga en ny kontaktpost definierar du en ContactRecord klass:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Skapa en instans av ContactRecord och placera den i en matris:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Spara matrisen med nya data i HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Granska resultaten:

    df.show()
    

    Du bör se utdata som ser ut så här:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Stäng Spark-gränssnittet genom att ange följande kommando:

    :q
    

Nästa steg