Apache HBase verilerini okuyup yazmak için Apache Spark kullanma
Apache hbase genellikle alt düzey apı (taramalar, alır ve koyar) ile veya Apache Phoenix kullanarak SQL söz dizimiyle sorgulanır. Apache ayrıca Apache Spark HBase bağlayıcısını de sağlar. Bağlayıcı, HBase tarafından depolanan verileri sorgulamak ve değiştirmek için kullanışlı ve verimli bir alternatiftir.
Önkoşullar
Aynı Sanal ağdadağıtılan Iki ayrı HDInsight kümesi. Bir HBase ve en az Spark 2,1 (HDInsight 3,6) yüklü bir Spark. Daha fazla bilgi için bkz. HDInsight 'ta Azure Portal kullanarak Linux tabanlı kümeler oluşturma.
Kümelerinizin birincil depolama alanı için URI şeması. bu düzen,
abfs://Azure Data Lake Storage 1. için Azure Data Lake Storage 2. veya adl://için Azure Blob Depolama için wasb://olacaktır. Blob Depolama için güvenli aktarım etkinse urı olurwasbs://. Ayrıca bkz. Güvenli aktarım.
Genel işlem
Spark kümenizi HBase kümenizi sorgulamak üzere etkinleştirmeye yönelik üst düzey işlem aşağıdaki gibidir:
- Bazı örnek verileri HBase 'de hazırlayın.
- hbase-site.xml dosyasını HBase küme yapılandırma klasöründen (/etc/HBase/conf) alın ve hbase-site.xml bir kopyasını Spark 2 yapılandırma klasörünüze yerleştirin (/etc/mini \ sunucu). (Isteğe bağlı: Bu işlemi otomatikleştirmek için HDInsight ekibi tarafından sunulan betiği kullanın)
- ,
spark-shellSeçeneğinde Maven koordinatlarıyla Spark HBase bağlayıcısından başvurma komutunu çalıştırınpackages. - Spark ile HBase arasındaki şemayı eşleyen bir katalog tanımlayın.
- RDD veya DataFrame API 'Lerini kullanarak HBase verileriyle etkileşim kurun.
Apache HBase 'de örnek verileri hazırlama
Bu adımda, Apache HBase 'de, daha sonra Spark kullanarak sorgulayabilmeniz için bir tablo oluşturup doldurursunuz.
sshHBase kümenize bağlanmak için komutunu kullanın. Aşağıdaki komutu,HBASECLUSTERHBase kümenizin adıyla değiştirerek düzenleyin ve ardından şu komutu girin:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.nethbase shellHBase etkileşimli kabuğu 'nu başlatmak için komutunu kullanın. SSH bağlantınıza aşağıdaki komutu girin:hbase shellcreateİki sütunlu ailelerle bir HBase tablosu oluşturmak için komutunu kullanın. Aşağıdaki komutu girin:create 'Contacts', 'Personal', 'Office'Belirli bir
puttabloda belirtilen bir sütunda belirtilen bir sütuna değer eklemek için komutunu kullanın. Aşağıdaki komutu girin:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'exitHBase etkileşimli kabuğunu durdurmak için komutunu kullanın. Aşağıdaki komutu girin:exit
Kümeler arasında bağlantı kurmak için betikleri çalıştırma
Kümeler arasındaki iletişimi ayarlamak için, kümelerinizde iki komut dosyası çalıştırmak için aşağıdaki adımları izleyin. Bu betikler, aşağıdaki ' iletişim el ile ayarlama ' bölümünde açıklanan dosya kopyalama işlemini otomatikleştirebilir.
- HBase kümesinden çalıştırdığınız betik,
hbase-site.xmlIP eşleme bilgilerini Spark kümenize bağlı olan varsayılan depolama alanına yükler ve HBase 'e yükler. - Spark kümesinden çalıştırdığınız betik, iki yardımcı betiği düzenli aralıklarla çalıştırmak için iki cron işi ayarlar:
- HBase cron işi –
hbase-site.xmlSpark varsayılan depolama hesabından yerel düğüme yeni dosyaları ve HBase IP eşlemesini indirin - Spark cron işi – bir Spark ölçeklendirmesinin oluşup olmadığını ve kümenin güvenli olup olmadığını denetler. Öyleyse,
/etc/hostsyerel olarak depolanan HBase IP eşlemesini dahil etmek için Düzenle
- HBase cron işi –
Note: devam etmeden önce, Spark kümesinin depolama hesabını HBase kümenize ikincil depolama hesabı olarak eklediğinizden emin olun. Aşağıda gösterildiği gibi betiklerin sırada olduğundan emin olun.
Değişiklikleri aşağıdaki noktalara göre uygulamak için HBase kümenizdeki betik eylemini kullanın:
Özellik Değer Bash betiği URI 'SI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.shDüğüm türleri Bölge Parametreler -s SECONDARYS_STORAGE_URLKalıcı evet SECONDARYS_STORAGE_URLSpark tarafı varsayılan depolamanın URL 'sidir. Parametre örneği:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
Aşağıdaki noktalara göre değişiklikleri uygulamak için Spark kümenizde betik eylemi kullanın:
Özellik Değer Bash betiği URI 'SI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.shDüğüm türleri Baş, çalışan, Zookeeper Parametreler -s "SPARK-CRON-SCHEDULE"(isteğe bağlı)-h "HBASE-CRON-SCHEDULE"seçimKalıcı evet - Bu kümenin güncelleştirme olup olmadığını ne sıklıkta otomatik olarak denetlemesini istediğinizi belirtebilirsiniz. Varsayılan:-s "*/1 * * * *"-h 0 (Bu örnekte, Spark cron her dakikada çalışır, ancak HBase cron çalıştırılmıyor)
- HBase cron varsayılan olarak ayarlanmamış olduğundan, HBase kümenize ölçeklendirme gerçekleştirirken bu betiği yeniden çalıştırmanız gerekir. HBase kümeniz sık ölçeklenirken, HBase cron işini otomatik olarak ayarlamayı seçebilirsiniz. Örneğin:
-h "*/30 * * * *"her 30 dakikada bir denetim gerçekleştirmek için betiği yapılandırır. Bu, genel depolama hesabındaki yeni HBase bilgilerinin yerel düğüme indirilmesini otomatik hale getirmek için HBase cron zamanlamasını düzenli aralıklarla çalıştırır.
İletişimi el ile ayarlayın (Yukarıdaki adımda belirtilen komut dosyası başarısız olursa Isteğe bağlı)
Note: Bu adımların, kümelerden birinin bir ölçeklendirme etkinliğine sahip olduğu her seferinde gerçekleştirmesi gerekir.
Yerel depolamadan hbase-site.xml, Spark kümenizin varsayılan depolama alanının köküne kopyalayın. Yapılandırmanızı yansıtmak için aşağıdaki komutu düzenleyin. Ardından, açık SSH oturumunından HBase kümesine şu komutu girin:
Söz dizimi değeri Yeni değer URI düzeni Depolama alanınızı yansıtacak şekilde değiştirin. Aşağıdaki sözdizimi, güvenli aktarım özellikli blob depolamaya yöneliktir. SPARK_STORAGE_CONTAINERSpark kümesi için kullanılan varsayılan depolama kapsayıcısı adıyla değiştirin. SPARK_STORAGE_ACCOUNTSpark kümesi için kullanılan varsayılan depolama hesabı adıyla değiştirin. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/Ardından, SSH bağlantınızı HBase kümenize çıkın.
exitSSH kullanarak Spark kümenizin baş düğümüne Bağlan. Aşağıdaki komutu
SPARKCLUSTERSpark kümenizin adıyla değiştirerek düzenleyin ve ardından şu komutu girin:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.nethbase-site.xmlSpark kümenizin varsayılan depolama alanından kümenin yerel depolama alanındaki Spark 2 yapılandırma klasörüne kopyalamak için aşağıdaki komutu girin:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Spark HBase bağlayıcısına başvuran Spark kabuğunu Çalıştır
Önceki adımı tamamladıktan sonra Spark HBase bağlayıcısının uygun sürümüne başvurarak Spark kabuğunu çalıştırabilmelisiniz. Küme senaryonuz için en son uygun Spark HBase Bağlayıcısı temel sürümünü bulmak için bkz. SHC Core Repository.
Örnek olarak, aşağıdaki tabloda, HDInsight ekibinin Şu anda kullandığı iki sürüm ve ilgili komutlar listelenmektedir. HBase ve Spark sürümleri tabloda belirtilen şekilde aynıysa, kümeleriniz için de aynı sürümleri kullanabilirsiniz.
Spark kümesine yönelik açık SSH oturumunda, bir Spark kabuğu başlatmak için aşağıdaki komutu girin:
Spark sürümü HDI HBase sürümü SHC sürümü Komut 2.1 HDI 3,6 (HBase 1,1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/Bu Spark kabuğu örneğini açık tutun ve bir katalog ve sorgu tanımlamayadevam edin. SHC çekirdek deposundaki sürümleriniz için karşılık gelen jar dosyaları dışındaki ' ı bulamazsanız okumaya devam edin.
Spark ve HBase sürümlerinin sonraki birleşimleri için, bu yapılar artık yukarıdaki depoda yayımlanmaz. jar dosyaları dışındaki 'ı doğrudan spark-hbase-connector GitHub dalından oluşturabilirsiniz. Örneğin, Spark 2,4 ve HBase 2,1 ile çalıştırıyorsanız, şu adımları izleyin:
Depoyu kopyalama:
git clone https://github.com/hortonworks-spark/shcDala git-2,4:
git checkout branch-2.4Daldan oluştur (. jar dosyası oluşturur):
mvn clean package -DskipTestsAşağıdaki komutu çalıştırın (oluşturduğunuz. jar dosyasına karşılık gelen. jar adını değiştirdiğinizden emin olun):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*Bu Spark kabuğu örneğini açık tutun ve sonraki bölüme devam edin.
Katalog ve sorgu tanımlama
Bu adımda, Apache Spark şemayı Apache HBase ile eşleyen bir katalog nesnesi tanımlarsınız.
Açık Spark kabuğunuzun aşağıdaki
importdeyimlerini girin:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._HBase 'de oluşturduğunuz kişiler tablosu için bir katalog tanımlamak üzere aşağıdaki komutu girin:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMarginKod:
- Adlı HBase tablosu için bir katalog şeması tanımlar
Contacts. - Rowkey 'i olarak tanımlar
keyve Spark içinde kullanılan sütun adlarını, HBase 'de kullanılan sütun ailesi, sütun adı ve sütun türüne eşleyin. - Rowkey '
rowkeyi belirli bir sütun ailesine sahip olan adlandırılmış bir sütun () olarak ayrıntılı şekilde tanımlarcfrowkey.
- Adlı HBase tablosu için bir katalog şeması tanımlar
HBase 'de tablonuzun etrafında bir veri çerçevesi sağlayan bir yöntemi tanımlamak için aşağıdaki komutu girin
Contacts:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }Veri çerçevesinin bir örneğini oluşturun:
val df = withCatalog(catalog)Veri çerçevesini sorgulama:
df.show()İki satırlık veri görmeniz gerekir:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+Spark SQL kullanarak HBase tablosunu sorgulayabilmeniz için geçici bir tablo kaydedin:
df.createTempView("contacts")tabloda SQL bir sorgu verme
contacts:spark.sqlContext.sql("select personalName, officeAddress from contacts").showŞöyle bir sonuç görmeniz gerekir:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Yeni veri Ekle
Yeni bir kişi kaydı eklemek için bir sınıf tanımlayın
ContactRecord:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )Bir örneği oluşturun
ContactRecordve bir diziye koyun:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContactYeni veri dizisini HBase 'e Kaydet:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()Sonuçları inceleyin:
df.show()Şunun gibi bir çıktı görmeniz gerekir:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+Aşağıdaki komutu girerek Spark kabuğunu kapatın:
:q