Az Apache Spark használata Apache HBase-adatok írására és olvasására

Az Apache HBase általában alacsony szintű API-val (vizsgálatokkal, lekérdezéssel és üzembe helyezéssel) vagy az Apache Phoenixet használó SQL-szintaxissal kérdezhető le. Az Apache az Apache Spark HBase Csatlakozás ort is biztosítja. A Csatlakozás or kényelmes és hatékony alternatíva a HBase által tárolt adatok lekérdezésére és módosítására.

Előfeltételek

Teljes folyamat

A Spark-fürt HBase-fürt lekérdezésének engedélyezésének magas szintű folyamata a következő:

  1. Készítsen elő néhány mintaadatot a HBase-ben.
  2. Szerezze be a hbase-site.xml fájlt a HBase-fürt konfigurációs mappájából (/etc/hbase/conf), és helyezze el a hbase-site.xml egy példányát a Spark 2 konfigurációs mappájába (/etc/spark2/conf). (NEM KÖTELEZŐ: használja a HDInsight csapata által biztosított szkriptet a folyamat automatizálásához)
  3. Futtassa a Spark HBase Csatlakozás orra való hivatkozás futtatását spark-shell a beállításBan a packages Maven koordinátái alapján.
  4. Definiáljon egy katalógust, amely leképozza a sémát a Sparkból a HBase-be.
  5. A HBase-adatok kezelése AZ RDD vagy a DataFrame API-k használatával.

Mintaadatok előkészítése az Apache HBase-ben

Ebben a lépésben létrehoz és feltölt egy táblát az Apache HBase-ben, amelyet aztán lekérdezhet a Spark használatával.

  1. ssh A parancs használatával csatlakozzon a HBase-fürthöz. Szerkessze a parancsot a HBase-fürt nevére cserélve HBASECLUSTER , majd írja be a parancsot:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. hbase shell A parancs használatával indítsa el a HBase interaktív rendszerhéjat. Írja be a következő parancsot az SSH-kapcsolatba:

    hbase shell
    
  3. create A paranccsal kétoszlopos családokat tartalmazó HBase-táblát hozhat létre. Írja be az alábbi parancsot:

    create 'Contacts', 'Personal', 'Office'
    
  4. put A parancs használatával értékeket szúrhat be egy adott tábla adott sorában lévő adott oszlopba. Írja be az alábbi parancsot:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. exit A parancs használatával állítsa le a HBase interaktív rendszerhéját. Írja be az alábbi parancsot:

    exit
    

Parancsfájlok futtatása a fürtök közötti kapcsolat beállításához

A fürtök közötti kommunikáció beállításához kövesse az alábbi lépéseket két szkript futtatásához a fürtökön. Ezek a szkriptek automatizálják a "Kommunikáció manuális beállítása" szakaszban leírt fájlmásolási folyamatot.

  • A HBase-fürtből futtatott szkript feltölti hbase-site.xml a HBase IP-leképezési adatait a Spark-fürthöz csatolt alapértelmezett tárolóba.
  • A Spark-fürtből futtatott szkript két cronfeladatot állít be két segédszkript rendszeres futtatásához:
    1. HBase cron-feladat – új hbase-site.xml fájlok és HBase IP-leképezés letöltése a Spark alapértelmezett tárfiókjából a helyi csomópontra
    2. Spark cron-feladat – Ellenőrzi, hogy történt-e Spark-skálázás, és hogy a fürt biztonságos-e. Ha igen, szerkessze /etc/hosts a helyileg tárolt HBase IP-leképezést

MEGJEGYZÉS: A folytatás előtt győződjön meg arról, hogy másodlagos tárfiókként hozzáadta a Spark-fürt tárfiókját a HBase-fürthöz. Győződjön meg arról, hogy a szkriptek a megadott sorrendben szerepelnek.

  1. A HBase-fürt szkriptműveletével alkalmazza a módosításokat az alábbi szempontok figyelembevételével:

    Tulajdonság Érték
    Bash-szkript URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Csomóponttípus(ok) Régió
    Paraméterek -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Kitartott igen
    • SECONDARYS_STORAGE_URL A Spark-oldal alapértelmezett tárolójának URL-címe. Példa paraméter: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. A Spark-fürtön a Szkriptművelettel alkalmazza a módosításokat az alábbi szempontok figyelembevételével:

    Tulajdonság Érték
    Bash-szkript URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Csomóponttípus(ok) Vezető, feldolgozó, Zookeeper
    Paraméterek -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Kitartott igen
    • Megadhatja, hogy a fürt milyen gyakran ellenőrizze automatikusan, hogy frissül-e. Alapértelmezett: -s "*/1 * * * *" -h 0 (Ebben a példában a Spark-cron percenként fut, míg a HBase cron nem fut)
    • Mivel a HBase cron alapértelmezés szerint nincs beállítva, újra kell futtatnia ezt a szkriptet, amikor skálázást végez a HBase-fürtön. Ha a HBase-fürt gyakran skálázódik, dönthet úgy, hogy automatikusan beállítja a HBase cron-feladatot. Például: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" a szkriptet úgy konfigurálja, hogy 30 percenként végezzen ellenőrzéseket. Ez rendszeres időközönként futtatja a HBase cron ütemezését, hogy automatizálja az új HBase-adatok letöltését a közös tárfiókon a helyi csomópontra.

Feljegyzés

Ezek a szkriptek csak HDI 5.0- és HDI 5.1-fürtökön működnek.

A kommunikáció manuális beállítása (Nem kötelező, ha a fenti lépésben megadott szkript sikertelen)

MEGJEGYZÉS: Ezeket a lépéseket minden alkalommal végre kell hajtani, amikor az egyik fürt skálázási tevékenységen megy keresztül.

  1. Másolja a hbase-site.xml a helyi tárolóból a Spark-fürt alapértelmezett tárolójának gyökerére. Szerkessze a parancsot a konfigurációnak megfelelően. Ezután a nyitott SSH-munkamenetből a HBase-fürtbe írja be a következő parancsot:

    Szintaxis értéke Új érték
    URI-séma Módosítsa úgy, hogy tükrözze a tárterületet. A szintaxis a biztonságos átvitelt engedélyező blobtárolókra használható.
    SPARK_STORAGE_CONTAINER Cserélje le a Spark-fürthöz használt alapértelmezett tárolónévre.
    SPARK_STORAGE_ACCOUNT Cserélje le a Spark-fürthöz használt alapértelmezett tárfióknévre.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Ezután lépjen ki az ssh-kapcsolatból a HBase-fürthöz.

    exit
    
  3. Csatlakozás a Spark-fürt fő csomópontjára az SSH használatával. Szerkessze a parancsot a Spark-fürt nevére cserélve SPARKCLUSTER , majd írja be a parancsot:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Adja meg a Spark-fürt alapértelmezett tárolójából a fürt helyi tárolóján lévő Spark 2 konfigurációs mappába másolandó hbase-site.xml parancsot:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

A Spark HBase Csatlakozás orra hivatkozó Spark Shell futtatása

Az előző lépés elvégzése után képesnek kell lennie a Spark-rendszerhéj futtatására, hivatkozva a Spark HBase Csatlakozás or megfelelő verziójára.

Az alábbi táblázat például két verziót és a HDInsight csapat által jelenleg használt parancsokat sorolja fel. A fürtökhöz ugyanazokat a verziókat használhatja, ha a HBase és a Spark verziói megegyeznek a táblázatban feltüntetettekkel.

  1. A Spark-fürtnek megnyitott SSH-munkamenetben adja meg a következő parancsot a Spark-rendszerhéj elindításához:

    Szikra változat HDI HBase-verzió SHC-verzió Parancs
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Tartsa nyitva ezt a Spark-rendszerhéj-példányt, és folytassa a katalógus és lekérdezés definiálásához. Ha nem találja az SHC Core-adattár verzióinak megfelelő jarokat, folytassa az olvasást.

A Spark- és HBase-verziók későbbi kombinációi esetében ezek az összetevők már nem jelennek meg a fenti adattárban. Az üvegeket közvetlenül a spark-hbase-connector GitHub-ágból hozhatja létre. Ha például a Spark 2.4-et és a HBase 2.1-et használja, hajtsa végre az alábbi lépéseket:

  1. Klónozza az adattárat:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Nyissa meg a branch-2.4-et:

    git checkout branch-2.4
    
  3. Buildelés az ágból (létrehoz egy .jar fájlt):

    mvn clean package -DskipTests
    
  4. Futtassa a következő parancsot (ügyeljen arra, hogy módosítsa a létrehozott .jar fájlnak megfelelő .jar nevet):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Tartsa nyitva ezt a Spark-rendszerhéj-példányt, és folytassa a következő szakaszban.

Katalógus és lekérdezés definiálása

Ebben a lépésben definiál egy katalógusobjektumot, amely leképezi a sémát az Apache Sparkból az Apache HBase-be.

  1. A megnyitott Spark Shellben adja meg a következő import utasításokat:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Adja meg az alábbi parancsot a HBase-ben létrehozott Névjegyek tábla katalógusának meghatározásához:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    A kód:

    1. Katalógusséma definiálása a HBase nevű Contactstáblához.
    2. Azonosítja a sorkulcsot key, és megfelelteti a Sparkban használt oszlopneveket a HBase-ben használt oszlopcsaládra, oszlopnévre és oszloptípusra.
    3. A sorkulcsot részletesen elnevezett oszlopként (rowkey) határozza meg, amelynek egy adott oszlopcsaládja cfrowkeyvan.
  3. Adja meg a parancsot egy olyan metódus meghatározásához, amely dataFrame-et biztosít a táblázat körül a Contacts HBase-ben:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Hozza létre a DataFrame egy példányát:

    val df = withCatalog(catalog)
    
  5. A DataFrame lekérdezése:

    df.show()
    

    Két adatsornak kell megjelennie:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Regisztráljon egy ideiglenes táblát, hogy lekérdezhesse a HBase-táblát a Spark SQL használatával:

    df.createTempView("contacts")
    
  7. SQL-lekérdezés kiadása a contacts táblán:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    A következő eredményeknek kell megjelenniük:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Új adatok beszúrása

  1. Új partnerrekord beszúrásához adjon meg egy osztályt ContactRecord :

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Hozzon létre egy példányt ContactRecord , és helyezze el egy tömbbe:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Mentse az új adatok tömbét a HBase-be:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Az eredmények vizsgálata:

    df.show()
    

    A következőhöz hasonló kimenetnek kell megjelennie:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Zárja be a Spark-rendszerhéjat a következő parancs beírásával:

    :q
    

Következő lépések