Apache Spark gebruiken om Apache HBase-gegevens te lezen en schrijven

Apache HBase wordt doorgaans opgevraagd met de API op laag niveau (scans, haalt en plaatst) of met een SQL syntaxis met behulp van Apache Phoenix. Apache biedt ook de Apache Spark HBase-connector. De connector is een handig en efficiënt alternatief voor het opvragen en wijzigen van gegevens die zijn opgeslagen door HBase.

Vereisten

  • Twee afzonderlijke HDInsight-clusters die in hetzelfde virtuele netwerk zijn geïmplementeerd. Eén HBase en één Spark met ten minste Spark 2.1 (HDInsight 3.6) geïnstalleerd. Zie Op Linux gebaseerde clusters maken in HDInsightmet behulp van de Azure Portal.

  • Het URI-schema voor de primaire opslag voor uw clusters. Dit schema is wasb:// voor Azure Blob Storage, voor Azure Data Lake Storage Gen2 of abfs:// adl:// voor Azure Data Lake Storage Gen1. Als beveiligde overdracht is ingeschakeld voor Blob Storage, is de URI wasbs:// . Zie ook beveiligde overdracht.

Algemeen proces

Het proces op hoog niveau voor het inschakelen van uw Spark-cluster om query's uit te voeren op uw HBase-cluster is als volgt:

  1. Bereid enkele voorbeeldgegevens voor in HBase.
  2. Verkrijg het hbase-site.xml-bestand uit de configuratiemap van uw HBase-cluster (/etc/hbase/conf) en plaats een kopie van hbase-site.xml in de configuratiemap van spark 2 (/etc/spark2/conf). (OPTIONEEL: gebruik het script dat is geleverd door het HDInsight-team om dit proces te automatiseren)
  3. Voer in de optie uit die verwijst naar de Spark HBase-connector op basis van spark-shell de Maven-coördinaten. packages
  4. Definieer een catalogus die het schema van Spark toeschrijft aan HBase.
  5. Interactie met de HBase-gegevens met behulp van de RDD- of DataFrame-API's.

Voorbeeldgegevens voorbereiden in Apache HBase

In deze stap maakt en vult u een tabel in Apache HBase die u vervolgens kunt opvragen met behulp van Spark.

  1. Gebruik de ssh opdracht om verbinding te maken met uw HBase-cluster. Bewerk de onderstaande opdracht door te HBASECLUSTER vervangen door de naam van uw HBase-cluster en voer vervolgens de opdracht in:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Gebruik de hbase shell opdracht om de interactieve HBase-shell te starten. Voer de volgende opdracht in uw SSH-verbinding in:

    hbase shell
    
  3. Gebruik de create opdracht om een HBase-tabel met twee kolomfamilies te maken. Voer de volgende opdracht in:

    create 'Contacts', 'Personal', 'Office'
    
  4. Gebruik de put opdracht om waarden in te voegen in een opgegeven kolom in een opgegeven rij in een bepaalde tabel. Voer de volgende opdracht in:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Gebruik de exit opdracht om de interactieve HBase-shell te stoppen. Voer de volgende opdracht in:

    exit
    

Scripts uitvoeren om verbinding tussen clusters in te stellen

Als u de communicatie tussen clusters wilt instellen, volgt u de onderstaande stappen om twee scripts op uw clusters uit te voeren. Met deze scripts wordt het proces voor het kopiëren van bestanden dat wordt beschreven in de sectie 'Communicatie handmatig instellen' hieronder automatiseren.

  • Het script dat u vanuit het HBase-cluster hebt uitgevoerd, uploadt en HBase IP-toewijzingsgegevens naar de standaardopslag hbase-site.xml die is gekoppeld aan uw Spark-cluster.
  • Het script dat u vanuit het Spark-cluster hebt uitgevoerd, stelt twee Cron-taken in om periodiek twee helperscripts uit te voeren:
    1. HBase Cron-taak: download nieuwe bestanden en HBase IP-toewijzing van hbase-site.xml standaardOpslagaccount van Spark naar lokaal knooppunt
    2. Spark Cron-taak: controleert of er een Spark-schaalbaarheid heeft plaatsgevonden en of het cluster veilig is. Zo ja, bewerk dan om /etc/hosts HBase IP-toewijzing op te nemen die lokaal is opgeslagen

OPMERKING: Voordat u doorgaat, moet u ervoor zorgen dat u het opslagaccount van het Spark-cluster als secundair opslagaccount hebt toegevoegd aan uw HBase-cluster. Zorg ervoor dat de scripts in de aangegeven volgorde worden weergegeven.

  1. Gebruik scriptactie op uw HBase-cluster om de wijzigingen toe te passen met de volgende overwegingen:

    Eigenschap Waarde
    Bash-script-URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh
    Knooppunttype(n) Regio
    Parameters -s SECONDARYS_STORAGE_URL
    Persistent ja
    • SECONDARYS_STORAGE_URL is de URL van de standaardopslag aan de Spark-zijde. Parametervoorbeeld: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
  2. Gebruik scriptactie op uw Spark-cluster om de wijzigingen toe te passen met de volgende overwegingen:

    Eigenschap Waarde
    Bash-script-URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh
    Knooppunttype(n) Head, Worker, Zookeeper
    Parameters -s "SPARK-CRON-SCHEDULE" (optioneel) -h "HBASE-CRON-SCHEDULE" (optioneel)
    Persistent ja
    • U kunt opgeven hoe vaak u wilt dat dit cluster automatisch controleert of het wordt bijgewerkt. Standaardinstelling: -s "*/1 * * * *" -h 0 (in dit voorbeeld wordt de Spark Cron elke minuut uitgevoerd, terwijl de HBase Cron niet wordt uitgevoerd)
    • Omdat HBase Cron niet standaard is ingesteld, moet u dit script opnieuw uitvoeren bij het schalen naar uw HBase-cluster. Als uw HBase-cluster vaak wordt geschaald, kunt u ervoor kiezen om de HBase Cron-taak automatisch in te stellen. Bijvoorbeeld: hiermee -h "*/30 * * * *" configureert u het script om elke 30 minuten controles uit te voeren. Hiermee wordt de HBase Cron-planning periodiek uitgevoerd om het downloaden van nieuwe HBase-gegevens op het algemene opslagaccount naar het lokale knooppunt te automatiseren.

Communicatie handmatig instellen (optioneel, als het opgegeven script in de bovenstaande stap mislukt)

OPMERKING: Deze stappen moeten worden uitvoeren telkens als een van de clusters een schaalactiviteit ondergaat.

  1. Kopieer de hbase-site.xml van lokale opslag naar de hoofdmap van de standaardopslag van uw Spark-cluster. Bewerk de onderstaande opdracht om uw configuratie weer te geven. Voer vervolgens vanuit uw open SSH-sessie naar het HBase-cluster de volgende opdracht in:

    Syntaxiswaarde Nieuwe waarde
    URI-schema Wijzig deze om uw opslag weer te geven. De onderstaande syntaxis is voor blob-opslag met beveiligde overdracht ingeschakeld.
    SPARK_STORAGE_CONTAINER Vervang door de standaardnaam van de opslagcontainer die wordt gebruikt voor het Spark-cluster.
    SPARK_STORAGE_ACCOUNT Vervang door de standaardopslagaccountnaam die wordt gebruikt voor het Spark-cluster.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Sluit vervolgens de SSH-verbinding met uw HBase-cluster af.

    exit
    
  3. Verbinding maken SSH naar het hoofdknooppunt van uw Spark-cluster. Bewerk de onderstaande opdracht door te SPARKCLUSTER vervangen door de naam van uw Spark-cluster en voer vervolgens de opdracht in:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Voer de onderstaande opdracht in om vanuit de standaardopslag van uw Spark-cluster te kopiëren naar de hbase-site.xml configuratiemap van Spark 2 in de lokale opslag van het cluster:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Spark Shell uitvoeren die verwijst naar de Spark HBase-connector

Nadat u de vorige stap hebt voltooid, moet u spark-shell kunnen uitvoeren en verwijzen naar de juiste versie van Spark HBase Connector. Zie SHC Core Repository (SHC Core-opslagplaats) voor de meest recente geschikte coreversie van Spark HBase Connector voor uw clusterscenario.

De volgende tabel bevat bijvoorbeeld twee versies en de bijbehorende opdrachten die het HDInsight-team momenteel gebruikt. U kunt dezelfde versies gebruiken voor uw clusters als de versies van HBase en Spark hetzelfde zijn als aangegeven in de tabel.

  1. Voer in uw open SSH-sessie met het Spark-cluster de volgende opdracht in om een Spark-shell te starten:

    Spark-versie HDI HBase-versie SHC-versie Opdracht
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Houd dit Spark-shell-exemplaar geopend en ga door met Een catalogus definiëren en query's uitvoeren. Als u de JAR's die overeenkomen met uw versies niet vindt in de SHC Core-opslagplaats, gaat u verder met lezen.

Voor volgende combinaties van Spark- en HBase-versies worden deze artefacten niet meer gepubliceerd in de bovenstaande repo. U kunt de JAR's rechtstreeks vanuit de spark-hbase-connector GitHub maken. Als u bijvoorbeeld met Spark 2.4 en HBase 2.1 wordt uitgevoerd, moet u deze stappen uitvoeren:

  1. Kloon de opslagplaats:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Ga naar branch-2.4:

    git checkout branch-2.4
    
  3. Bouwen vanuit de -vertakking (hiermee maakt u een JAR-bestand):

    mvn clean package -DskipTests
    
  4. Voer de volgende opdracht uit (zorg ervoor dat u de .jar-naam wijzigt die overeenkomt met het JAR-bestand dat u hebt gemaakt):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Houd dit Spark-shell-exemplaar geopend en ga door naar de volgende sectie.

Een catalogus en query definiëren

In deze stap definieert u een catalogusobject dat het schema Apache Spark aan Apache HBase.

  1. Voer in de geopende Spark-shell de volgende instructies import in:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Voer de onderstaande opdracht in om een catalogus te definiëren voor de tabel Contactpersonen die u in HBase hebt gemaakt:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    De code:

    1. Definieert een catalogusschema voor de HBase-tabel met de naam Contacts .
    2. Identificeert de rowkey als en wijst de kolomnamen die in Spark worden gebruikt toe aan de kolomfamilie, kolomnaam en kolomtype, zoals gebruikt key in HBase.
    3. Definieert de rijsleutel in detail als een benoemde kolom ( rowkey ), die een specifieke kolomfamilie van cf rowkey heeft.
  3. Voer de onderstaande opdracht in om een methode te definiëren die een DataFrame rond uw Contacts tabel in HBase biedt:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Maak een exemplaar van het DataFrame:

    val df = withCatalog(catalog)
    
  5. Query's uitvoeren op het DataFrame:

    df.show()
    

    Als het goed is, ziet u twee rijen met gegevens:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Registreer een tijdelijke tabel zodat u een query kunt uitvoeren op de HBase-tabel met behulp van Spark SQL:

    df.createTempView("contacts")
    
  7. Een query SQL de contacts tabel:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    De resultaten moeten er als de volgende zijn:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Nieuwe gegevens invoegen

  1. Als u een nieuwe Contactpersoon-record wilt invoegen, definieert u een ContactRecord klasse:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Maak een exemplaar van ContactRecord en plaats deze in een matrix:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Sla de matrix met nieuwe gegevens op in HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Bekijk de resultaten:

    df.show()
    

    De uitvoer ziet er als volgt uit:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Sluit de Spark-shell door de volgende opdracht in te voeren:

    :q
    

Volgende stappen