Apache Spark gebruiken om Apache HBase-gegevens te lezen en schrijven
Apache HBase wordt doorgaans opgevraagd met de API op laag niveau (scans, haalt en plaatst) of met een SQL syntaxis met behulp van Apache Phoenix. Apache biedt ook de Apache Spark HBase-connector. De connector is een handig en efficiënt alternatief voor het opvragen en wijzigen van gegevens die zijn opgeslagen door HBase.
Vereisten
Twee afzonderlijke HDInsight-clusters die in hetzelfde virtuele netwerk zijn geïmplementeerd. Eén HBase en één Spark met ten minste Spark 2.1 (HDInsight 3.6) geïnstalleerd. Zie Op Linux gebaseerde clusters maken in HDInsightmet behulp van de Azure Portal.
Het URI-schema voor de primaire opslag voor uw clusters. Dit schema is wasb:// voor Azure Blob Storage, voor Azure Data Lake Storage Gen2 of
abfs://adl:// voor Azure Data Lake Storage Gen1. Als beveiligde overdracht is ingeschakeld voor Blob Storage, is de URIwasbs://. Zie ook beveiligde overdracht.
Algemeen proces
Het proces op hoog niveau voor het inschakelen van uw Spark-cluster om query's uit te voeren op uw HBase-cluster is als volgt:
- Bereid enkele voorbeeldgegevens voor in HBase.
- Verkrijg het hbase-site.xml-bestand uit de configuratiemap van uw HBase-cluster (/etc/hbase/conf) en plaats een kopie van hbase-site.xml in de configuratiemap van spark 2 (/etc/spark2/conf). (OPTIONEEL: gebruik het script dat is geleverd door het HDInsight-team om dit proces te automatiseren)
- Voer in de optie uit die verwijst naar de Spark HBase-connector op basis van
spark-shellde Maven-coördinaten.packages - Definieer een catalogus die het schema van Spark toeschrijft aan HBase.
- Interactie met de HBase-gegevens met behulp van de RDD- of DataFrame-API's.
Voorbeeldgegevens voorbereiden in Apache HBase
In deze stap maakt en vult u een tabel in Apache HBase die u vervolgens kunt opvragen met behulp van Spark.
Gebruik de
sshopdracht om verbinding te maken met uw HBase-cluster. Bewerk de onderstaande opdracht door teHBASECLUSTERvervangen door de naam van uw HBase-cluster en voer vervolgens de opdracht in:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.netGebruik de
hbase shellopdracht om de interactieve HBase-shell te starten. Voer de volgende opdracht in uw SSH-verbinding in:hbase shellGebruik de
createopdracht om een HBase-tabel met twee kolomfamilies te maken. Voer de volgende opdracht in:create 'Contacts', 'Personal', 'Office'Gebruik de
putopdracht om waarden in te voegen in een opgegeven kolom in een opgegeven rij in een bepaalde tabel. Voer de volgende opdracht in:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'Gebruik de
exitopdracht om de interactieve HBase-shell te stoppen. Voer de volgende opdracht in:exit
Scripts uitvoeren om verbinding tussen clusters in te stellen
Als u de communicatie tussen clusters wilt instellen, volgt u de onderstaande stappen om twee scripts op uw clusters uit te voeren. Met deze scripts wordt het proces voor het kopiëren van bestanden dat wordt beschreven in de sectie 'Communicatie handmatig instellen' hieronder automatiseren.
- Het script dat u vanuit het HBase-cluster hebt uitgevoerd, uploadt en HBase IP-toewijzingsgegevens naar de standaardopslag
hbase-site.xmldie is gekoppeld aan uw Spark-cluster. - Het script dat u vanuit het Spark-cluster hebt uitgevoerd, stelt twee Cron-taken in om periodiek twee helperscripts uit te voeren:
- HBase Cron-taak: download nieuwe bestanden en HBase IP-toewijzing van
hbase-site.xmlstandaardOpslagaccount van Spark naar lokaal knooppunt - Spark Cron-taak: controleert of er een Spark-schaalbaarheid heeft plaatsgevonden en of het cluster veilig is. Zo ja, bewerk dan om
/etc/hostsHBase IP-toewijzing op te nemen die lokaal is opgeslagen
- HBase Cron-taak: download nieuwe bestanden en HBase IP-toewijzing van
OPMERKING: Voordat u doorgaat, moet u ervoor zorgen dat u het opslagaccount van het Spark-cluster als secundair opslagaccount hebt toegevoegd aan uw HBase-cluster. Zorg ervoor dat de scripts in de aangegeven volgorde worden weergegeven.
Gebruik scriptactie op uw HBase-cluster om de wijzigingen toe te passen met de volgende overwegingen:
Eigenschap Waarde Bash-script-URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.shKnooppunttype(n) Regio Parameters -s SECONDARYS_STORAGE_URLPersistent ja SECONDARYS_STORAGE_URLis de URL van de standaardopslag aan de Spark-zijde. Parametervoorbeeld:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
Gebruik scriptactie op uw Spark-cluster om de wijzigingen toe te passen met de volgende overwegingen:
Eigenschap Waarde Bash-script-URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.shKnooppunttype(n) Head, Worker, Zookeeper Parameters -s "SPARK-CRON-SCHEDULE"(optioneel)-h "HBASE-CRON-SCHEDULE"(optioneel)Persistent ja - U kunt opgeven hoe vaak u wilt dat dit cluster automatisch controleert of het wordt bijgewerkt. Standaardinstelling: -s "*/1 * * * *" -h 0 (in dit voorbeeld wordt de Spark Cron elke minuut uitgevoerd, terwijl de HBase Cron niet wordt uitgevoerd)
- Omdat HBase Cron niet standaard is ingesteld, moet u dit script opnieuw uitvoeren bij het schalen naar uw HBase-cluster. Als uw HBase-cluster vaak wordt geschaald, kunt u ervoor kiezen om de HBase Cron-taak automatisch in te stellen. Bijvoorbeeld: hiermee
-h "*/30 * * * *"configureert u het script om elke 30 minuten controles uit te voeren. Hiermee wordt de HBase Cron-planning periodiek uitgevoerd om het downloaden van nieuwe HBase-gegevens op het algemene opslagaccount naar het lokale knooppunt te automatiseren.
Communicatie handmatig instellen (optioneel, als het opgegeven script in de bovenstaande stap mislukt)
OPMERKING: Deze stappen moeten worden uitvoeren telkens als een van de clusters een schaalactiviteit ondergaat.
Kopieer de hbase-site.xml van lokale opslag naar de hoofdmap van de standaardopslag van uw Spark-cluster. Bewerk de onderstaande opdracht om uw configuratie weer te geven. Voer vervolgens vanuit uw open SSH-sessie naar het HBase-cluster de volgende opdracht in:
Syntaxiswaarde Nieuwe waarde URI-schema Wijzig deze om uw opslag weer te geven. De onderstaande syntaxis is voor blob-opslag met beveiligde overdracht ingeschakeld. SPARK_STORAGE_CONTAINERVervang door de standaardnaam van de opslagcontainer die wordt gebruikt voor het Spark-cluster. SPARK_STORAGE_ACCOUNTVervang door de standaardopslagaccountnaam die wordt gebruikt voor het Spark-cluster. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/Sluit vervolgens de SSH-verbinding met uw HBase-cluster af.
exitVerbinding maken SSH naar het hoofdknooppunt van uw Spark-cluster. Bewerk de onderstaande opdracht door te
SPARKCLUSTERvervangen door de naam van uw Spark-cluster en voer vervolgens de opdracht in:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.netVoer de onderstaande opdracht in om vanuit de standaardopslag van uw Spark-cluster te kopiëren naar de
hbase-site.xmlconfiguratiemap van Spark 2 in de lokale opslag van het cluster:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Spark Shell uitvoeren die verwijst naar de Spark HBase-connector
Nadat u de vorige stap hebt voltooid, moet u spark-shell kunnen uitvoeren en verwijzen naar de juiste versie van Spark HBase Connector. Zie SHC Core Repository (SHC Core-opslagplaats) voor de meest recente geschikte coreversie van Spark HBase Connector voor uw clusterscenario.
De volgende tabel bevat bijvoorbeeld twee versies en de bijbehorende opdrachten die het HDInsight-team momenteel gebruikt. U kunt dezelfde versies gebruiken voor uw clusters als de versies van HBase en Spark hetzelfde zijn als aangegeven in de tabel.
Voer in uw open SSH-sessie met het Spark-cluster de volgende opdracht in om een Spark-shell te starten:
Spark-versie HDI HBase-versie SHC-versie Opdracht 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/Houd dit Spark-shell-exemplaar geopend en ga door met Een catalogus definiëren en query's uitvoeren. Als u de JAR's die overeenkomen met uw versies niet vindt in de SHC Core-opslagplaats, gaat u verder met lezen.
Voor volgende combinaties van Spark- en HBase-versies worden deze artefacten niet meer gepubliceerd in de bovenstaande repo. U kunt de JAR's rechtstreeks vanuit de spark-hbase-connector GitHub maken. Als u bijvoorbeeld met Spark 2.4 en HBase 2.1 wordt uitgevoerd, moet u deze stappen uitvoeren:
Kloon de opslagplaats:
git clone https://github.com/hortonworks-spark/shcGa naar branch-2.4:
git checkout branch-2.4Bouwen vanuit de -vertakking (hiermee maakt u een JAR-bestand):
mvn clean package -DskipTestsVoer de volgende opdracht uit (zorg ervoor dat u de .jar-naam wijzigt die overeenkomt met het JAR-bestand dat u hebt gemaakt):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*Houd dit Spark-shell-exemplaar geopend en ga door naar de volgende sectie.
Een catalogus en query definiëren
In deze stap definieert u een catalogusobject dat het schema Apache Spark aan Apache HBase.
Voer in de geopende Spark-shell de volgende instructies
importin:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._Voer de onderstaande opdracht in om een catalogus te definiëren voor de tabel Contactpersonen die u in HBase hebt gemaakt:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMarginDe code:
- Definieert een catalogusschema voor de HBase-tabel met de naam
Contacts. - Identificeert de rowkey als en wijst de kolomnamen die in Spark worden gebruikt toe aan de kolomfamilie, kolomnaam en kolomtype, zoals gebruikt
keyin HBase. - Definieert de rijsleutel in detail als een benoemde kolom (
rowkey), die een specifieke kolomfamilie vancfrowkeyheeft.
- Definieert een catalogusschema voor de HBase-tabel met de naam
Voer de onderstaande opdracht in om een methode te definiëren die een DataFrame rond uw
Contactstabel in HBase biedt:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }Maak een exemplaar van het DataFrame:
val df = withCatalog(catalog)Query's uitvoeren op het DataFrame:
df.show()Als het goed is, ziet u twee rijen met gegevens:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+Registreer een tijdelijke tabel zodat u een query kunt uitvoeren op de HBase-tabel met behulp van Spark SQL:
df.createTempView("contacts")Een query SQL de
contactstabel:spark.sqlContext.sql("select personalName, officeAddress from contacts").showDe resultaten moeten er als de volgende zijn:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Nieuwe gegevens invoegen
Als u een nieuwe Contactpersoon-record wilt invoegen, definieert u een
ContactRecordklasse:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )Maak een exemplaar van
ContactRecorden plaats deze in een matrix:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContactSla de matrix met nieuwe gegevens op in HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()Bekijk de resultaten:
df.show()De uitvoer ziet er als volgt uit:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+Sluit de Spark-shell door de volgende opdracht in te voeren:
:q