Použití Apache Sparku ke čtení a zápisu dat Apache HBase
Apache hbas se obvykle dotazuje buď pomocí rozhraní API na nízké úrovni (kontroluje, načítá a umísťuje), nebo pomocí SQL syntaxe pomocí Apache Phoenix. Apache taky poskytuje konektor Apache Spark HBA. Konektor je praktická a efektivní alternativa k dotazování a úpravám dat uložených pomocí adaptérů HBA.
Požadavky
Ve stejné virtuální sítijsou nasazené dva samostatné clustery HDInsight. Je nainstalovaná jedna z adaptérů HBA a jedna Spark s aspoň Spark 2,1 (HDInsight 3,6). Další informace najdete v tématu Vytvoření clusterů se systémem Linux v HDInsight pomocí Azure Portal.
Schéma identifikátoru URI pro primární úložiště clusterů. toto schéma by bylo wasb://pro Azure Blob Storage pro
abfs://Azure Data Lake Storage Gen2 nebo adl://pro Azure Data Lake Storage Gen1. pokud je pro Blob Storage povolený zabezpečený přenos, identifikátor URI by bylwasbs://. Viz také zabezpečený přenos.
Celkový proces
Proces vysoké úrovně, který umožňuje vašemu clusteru Spark dotazovat se na váš cluster HBA, je následující:
- Připravte si některá ukázková data v adaptérech HBA.
- Získejte soubor hbase-site.xml z vaší konfigurační složky clusteru HBA (/etc/HBase/conf) a umístěte kopii hbase-site.xml do konfigurační složky Spark 2 (/etc/spark2/conf). (Volitelné: pro automatizaci tohoto procesu použijte skript poskytovaný týmem HDInsight.)
- Spusťte
spark-shellodkazování konektoru Spark HBA podle jeho souřadnic Maven vpackagesMožnosti. - Definujte katalog, který mapuje schéma ze Sparku na HBA.
- Můžete pracovat s daty HBA pomocí rozhraní API RDD nebo dataframe.
Příprava ukázkových dat v Apache HBA
V tomto kroku vytvoříte a naplníte tabulku v Apache Hbach, které pak můžete dotazovat pomocí Sparku.
Pomocí
sshpříkazu se připojte ke clusteru HBA. Níže uvedený příkaz upravte nahrazenímHBASECLUSTERnázvem vašeho clusteru HBA a zadáním příkazu:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.netPomocí
hbase shellpříkazu spusťte interaktivní prostředí pro adaptéry HBA. Do připojení SSH zadejte následující příkaz:hbase shellPomocí
createpříkazu vytvořte tabulku HBA se dvěma skupinami sloupců. Zadejte následující příkaz:create 'Contacts', 'Personal', 'Office'Pomocí
putpříkazu můžete vkládat hodnoty do zadaného sloupce v zadaném řádku v konkrétní tabulce. Zadejte následující příkaz:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'Pomocí
exitpříkazu zastavte prostředí HBA interaktivní prostředí. Zadejte následující příkaz:exit
Spuštění skriptů pro nastavení připojení mezi clustery
Pokud chcete nastavit komunikaci mezi clustery, postupujte podle následujících kroků a spusťte ve svých clusterech dva skripty. Tyto skripty automatizují proces kopírování souborů, který je popsaný v části Nastavení komunikace ručně.
- Skript, který spustíte z clusteru HBA, nahraje
hbase-site.xmla přidá informace mapování IP na výchozí úložiště připojené ke clusteru Spark. - Skript, který spustíte z clusteru Spark, nastaví dvě úlohy cron, aby se pravidelně spouštěly dva pomocné skripty:
- HBA cron úlohu – umožňuje stáhnout nové
hbase-site.xmlsoubory a HBA mapování IP adres z výchozího účtu úložiště Spark do místního uzlu. - Úloha Spark cron – kontroluje, jestli došlo k škálování Sparku a pokud je cluster zabezpečený. Pokud ano, upravte,
/etc/hostsaby zahrnovaly místně uložené mapování IP adaptérů.
- HBA cron úlohu – umožňuje stáhnout nové
Poznámka: než budete pokračovat, ujistěte se, že jste přidali účet úložiště clusteru Spark do clusteru HBA jako sekundární účet úložiště. Ujistěte se, že jsou skripty v uvedeném pořadí, jak je uvedeno níže.
Použijte akci skriptu na svém clusteru HBA, aby se změny projevily s následujícími požadavky:
Vlastnost Hodnota Identifikátor URI skriptu bash https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.shTyp (typy) uzlů Oblast Parametry -s SECONDARYS_STORAGE_URLTrvalé ano SECONDARYS_STORAGE_URLje adresa URL výchozího úložiště na straně Spark. Příklad parametru:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
Použijte akci skriptu v clusteru Spark pro použití změn s následujícími požadavky:
Vlastnost Hodnota Identifikátor URI skriptu bash https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.shTyp (typy) uzlů Vedoucí pracovník, Zookeeper Parametry -s "SPARK-CRON-SCHEDULE"(volitelné)-h "HBASE-CRON-SCHEDULE"volitelnéTrvalé ano - Můžete určit, jak často chcete, aby tento cluster automaticky kontroloval aktualizace. Výchozí:-s "*/1 * * * *"-h 0 (v tomto příkladu se Spark cron spouští každou minutu, zatímco adaptéry HBA cron neběží)
- Vzhledem k tomu, že adaptéry HBA cron nejsou nastavené ve výchozím nastavení, musíte tento skript spustit znovu při provádění škálování clusteru HBA. Pokud se vaše adaptéry HBA často škálují, můžete se rozhodnout nastavit úlohu HBA cron automaticky. Například:
-h "*/30 * * * *"nakonfiguruje skript, aby prováděl kontroly každých 30 minut. Tím se v pravidelných intervalech cron plán HBA pro automatizaci stahování nových informací o adaptérech HBA na společném účtu úložiště do místního uzlu.
Ruční nastavení komunikace (nepovinný, pokud zadaný skript ve výše uvedeném kroku selže)
Poznámka: Tyto kroky je potřeba provést pokaždé, když jeden z clusterů přechází do aktivity škálování.
Zkopírujte hbase-site.xml z místního úložiště do kořenového adresáře výchozího úložiště clusteru Spark. Upravte následující příkaz tak, aby odrážel vaši konfiguraci. Pak z otevřené relace SSH do clusteru HBA zadejte příkaz:
Hodnota syntaxe Nová hodnota Schéma identifikátoru URI Upravte, aby odrážela vaše úložiště. Níže uvedená syntaxe je pro úložiště objektů BLOB s povoleným zabezpečeným přenosem. SPARK_STORAGE_CONTAINERNahraďte výchozím názvem kontejneru úložiště použitým pro cluster Spark. SPARK_STORAGE_ACCOUNTNahraďte názvem výchozího účtu úložiště použitým pro cluster Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/Pak ukončete připojení SSH k vašemu clusteru HBA.
exitpomocí SSH Připojení k hlavnímu uzlu clusteru Spark. Níže uvedený příkaz upravte nahrazením
SPARKCLUSTERnázvem vašeho clusteru Spark a zadáním příkazu:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.netZadejte následující příkaz pro zkopírování
hbase-site.xmlz výchozího úložiště clusteru Spark do složky Konfigurace Spark 2 v místním úložišti clusteru:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Spustit prostředí Spark odkazující na konektor Spark HBA
Po dokončení předchozího kroku byste měli být schopni spustit prostředí Spark, které odkazuje na příslušnou verzi konektoru Spark HBA. Nejnovější verzi jádra konektoru Spark HBA pro váš scénář najdete v tématu úložiště SHC Core.
Příklad: v následující tabulce jsou uvedeny dvě verze a odpovídající příkazy, které tým HDInsight aktuálně používá. Pokud jsou verze adaptérů HBA a Spark stejné jako v tabulce, můžete pro své clustery použít stejné verze.
V otevřené relaci SSH ke clusteru Spark zadejte následující příkaz, který spustí prostředí Spark:
Verze Sparku Verze HDI HBA Verze SHC Příkaz 2.1 HDI 3,6 (HBA 1,1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/Nechte tuto instanci prostředí Sparku otevřenou a pokračujte definováním katalogu a dotazu. Pokud nenajdete jar, který odpovídá vašim verzím v úložišti SHC Core, pokračujte ve čtení.
Pro následné kombinace verzí Spark a HBA již nejsou tyto artefakty publikovány ve výše uvedeném úložišti. jar můžete vytvořit přímo z větve spark-hba-connector GitHub. Pokud například používáte se systémem Spark 2,4 a HBA 2,1, proveďte tyto kroky:
Naklonujte úložiště:
git clone https://github.com/hortonworks-spark/shcPřejít na větev-2,4:
git checkout branch-2.4Sestavení z větve (vytvoří soubor. jar):
mvn clean package -DskipTestsSpusťte následující příkaz (nezapomeňte změnit název. jar, který odpovídá sestavenému souboru. jar):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*Nechte tuto instanci prostředí Sparku otevřenou a pokračujte k další části.
Definování katalogu a dotazu
V tomto kroku definujete objekt katalogu, který mapuje schéma z Apache Spark na Apache HBA.
V otevřeném prostředí Spark zadejte následující
importpříkazy:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._Zadejte následující příkaz pro definování katalogu pro tabulku kontaktů, kterou jste vytvořili v části HBA:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMarginKód:
- Definuje schéma katalogu pro tabulku HBA s názvem
Contacts. - Identifikuje rowkey jako
keya mapuje názvy sloupců používané ve Sparku na rodinu sloupců, název sloupce a typ sloupce, jak se používá v adaptérech HBA. - Definuje rowkey podrobně jako pojmenovaný sloupec (
rowkey), který má konkrétní rodinu sloupcůcfrowkey.
- Definuje schéma katalogu pro tabulku HBA s názvem
Níže uvedeným příkazem Definujte metodu, která poskytuje objekt dataframe kolem
Contactstabulky v hbach:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }Vytvořte instanci datového rámce:
val df = withCatalog(catalog)Dotaz na datový rámec:
df.show()Měli byste vidět dva řádky dat:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+Zaregistrujte dočasnou tabulku, abyste mohli zadat dotaz na tabulku HBA pomocí Spark SQL:
df.createTempView("contacts")vydejte SQL dotaz na
contactstabulku:spark.sqlContext.sql("select personalName, officeAddress from contacts").showMěli byste vidět podobné výsledky:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Vložit nová data
Chcete-li vložit nový záznam kontaktu, definujte
ContactRecordtřídu:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )Vytvořte instanci
ContactRecorda vložte ji do pole:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContactUložte pole nových dat do HBA:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()Projděte si výsledky:
df.show()Měl by se zobrazit výstup podobný tomuto:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+Zavřete prostředí Spark zadáním následujícího příkazu:
:q