Bagikan melalui


Menggunakan Apache Spark untuk membaca dan menulis data Apache HBase

Apache HBase biasanya dikueri dengan API tingkat rendah (memindai, mengambil, dan meletakkan) atau dengan sintaks SQL menggunakan Apache Phoenix. Apache juga menyediakan Apache Spark HBase Connector. Konektor adalah alternatif yang nyaman dan efisien untuk mengkueri dan memodifikasi data yang disimpan oleh HBase.

Prasyarat

  • Dua kluster HDInsight terpisah yang disebarkan dalam jaringan virtual yang sama. Satu HBase, dan satu Spark dengan setidaknya Spark 2.1 (HDInsight 3.6) terinstal. Untuk informasi selengkapnya, lihat Membuat kluster berbasis Linux di HDInsight menggunakan portal Microsoft Azure.

  • Skema URI untuk penyimpanan utama kluster Anda. Skemanya akan menjadi wasb:// untuk Azure Blob Storage, abfs:// untuk Azure Data Lake Storage Gen2, atau adl:// untuk Azure Data Lake Storage Gen1. Jika transfer aman diaktifkan untuk Blob Storage, URI akan menjadi wasbs://. Lihat juga, transfer aman.

Memproses keseluruhan

Memproses tingkat tinggi untuk pengaktifan kluster Spark Anda untuk mengkueri kluster HBase Anda adalah sebagai berikut:

  1. Siapkan beberapa data sampel di HBase.
  2. Dapatkan file hbase-site.xml dari folder konfigurasi kluster HBase Anda (/etc/hbase/conf), dan letakkan salinan hbase-site.xml di folder konfigurasi Spark 2 Anda (/etc/spark2/conf). (OPSIONAL: gunakan skrip yang disediakan oleh tim HDInsight untuk mengotomatiskan proses ini)
  3. Jalankan spark-shell yang mereferensikan Spark HBase Connector dengan koordinat Maven-nya dalam opsi packages.
  4. Tentukan katalog yang memetakan skema dari Spark ke HBase.
  5. Lakukan interaksi dengan data HBase menggunakan API RDD atau DataFrame.

Menyiapkan data sampel di Apache HBase

Dalam langkah ini, Anda membuat dan mengisi tabel di Apache HBase yang kemudian dapat Anda kueri menggunakan Spark.

  1. Gunakan perintah ssh untuk menghubungkan ke kluster HBase Anda. Edit perintah dengan mengganti HBASECLUSTER dengan nama kluster HBase Anda, lalu masukkan perintah:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Gunakan perintah hbase shell untuk memulai shell interaktif HBase. Masukkan perintah berikut di koneksi SSH Anda:

    hbase shell
    
  3. Gunakan perintah create untuk membuat tabel HBase dengan famili dua kolom. Masukkan perintah berikut:

    create 'Contacts', 'Personal', 'Office'
    
  4. Gunakan perintah put untuk menyisipkan nilai pada kolom tertentu dalam baris tertentu dalam tabel tertentu. Masukkan perintah berikut:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Gunakan perintah exit untuk menghentikan shell interaktif HBase. Masukkan perintah berikut:

    exit
    

Menjalankan skrip untuk menyiapkan koneksi antar kluster

Untuk menyiapkan komunikasi antar kluster, ikuti langkah-langkah untuk menjalankan dua skrip pada kluster Anda. Skrip ini akan mengotomatiskan proses penyalinan file yang dijelaskan di bagian 'Siapkan komunikasi secara manual'.

  • Skrip yang Anda jalankan dari kluster HBase akan mengunggahhbase-site.xml dan informasi pemetaan IP HBase ke penyimpanan default yang terpasang pada kluster Spark Anda.
  • Skrip yang Anda jalankan dari kluster Spark menyiapkan dua pekerjaan cron untuk menjalankan dua skrip pembantu secara berkala:
    1. Pekerjaan cron HBase – unduh file hbase-site.xml baru dan pemetaan IP HBase dari akun penyimpanan default Spark ke simpul lokal
    2. Pekerjaan cron Spark - memeriksa apakah penyekalaan Spark terjadi dan apakah kluster aman. Jika memang demikian, edit /etc/hosts untuk menyertakan pemetaan IP HBase yang disimpan secara lokal

CATATAN: Sebelum melanjutkan, pastikan Anda telah menambahkan akun penyimpanan kluster Spark ke kluster HBase Anda sebagai akun penyimpanan sekunder. Pastikan Anda skrip secara berurutan seperti yang ditunjukkan.

  1. Gunakan Tindakan Skrip pada kluster HBase Anda untuk menerapkan perubahan dengan pertimbangan berikut:

    Properti Nilai
    URI skrip bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Jenis node Wilayah
    Parameter -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Dipertahankan yes
    • SECONDARYS_STORAGE_URL adalah url penyimpanan default sisi Spark. Contoh Parameter: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Menggunakan Tindakan Skrip pada kluster Spark Anda untuk menerapkan perubahan dengan pertimbangan berikut:

    Properti Nilai
    URI skrip bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Jenis node Head, Pekerja, Zookeeper
    Parameter -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Dipertahankan yes
    • Anda dapat menentukan seberapa sering Anda menginginkan kluster ini secara otomatis memeriksa apakah ada pembaruan. Default: -s "*/1 * * * *" -h 0 (Dalam contoh ini, cron Spark berjalan setiap menit, sementara cron HBase tidak berjalan)
    • Karena HBase cron tidak disiapkan secara default, Anda perlu menjalankan ulang skrip ini saat melakukan penskalakan ke kluster HBase Anda. Jika kluster HBase sering diskalakan, Anda dapat memilih untuk mengatur pekerjaan cron HBase secara otomatis. Contohnya: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" mengonfigurasi skrip untuk melakukan pemeriksaan setiap 30 menit. Hal ini akan menjalankan jadwal cron HBase secara berkala untuk mengotomatiskan pengunduhan informasi HBase baru pada akun penyimpanan umum ke simpul lokal.

Catatan

Skrip ini hanya berfungsi pada kluster HDI 5.0 dan HDI 5.1.

Menyiapkan komunikasi secara manual (Opsional, jika langkah skrip yang disediakan di atas gagal)

CATATAN: Langkah-langkah ini perlu dilakukan setiap kali salah satu kluster menjalani aktivitas penyekalaan.

  1. Salin hbase-site.xml dari penyimpanan lokal ke root penyimpanan default kluster Spark Anda. Edit perintah untuk mencerminkan konfigurasi Anda. Kemudian, dari sesi SSH terbuka Anda ke kluster HBase, masukkan perintah:

    Nilai sintaks Nilai baru
    Skema URI Ubah untuk mencerminkan penyimpanan Anda. Sintaksnya adalah untuk penyimpanan blob dengan transfer aman diaktifkan.
    SPARK_STORAGE_CONTAINER Ganti dengan nama kontainer penyimpanan default yang digunakan untuk kluster Spark.
    SPARK_STORAGE_ACCOUNT Ganti dengan nama akun penyimpanan default yang digunakan untuk kluster Spark.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Kemudian keluarlah dari koneksi ssh Anda ke kluster HBase Anda.

    exit
    
  3. Sambungkan ke simpul kepala kluster Spark Anda menggunakan SSH. Edit perintah dengan mengganti SPARKCLUSTER dengan nama kluster Spark Anda, lalu masukkan perintah:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Masukkan perintah untuk menyalin hbase-site.xml dari penyimpanan default kluster Spark Anda ke folder konfigurasi Spark 2 pada penyimpanan lokal kluster:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Jalankan Spark Shell yang mereferensikan Spark HBase Connector

Setelah menyelesaikan langkah sebelumnya, Anda akan dapat menjalankan shell Spark, yang mereferensikan versi Spark HBase Connector yang sesuai.

Sebagai contoh, tabel berikut ini mencantumkan dua versi dan perintah terkait yang saat ini digunakan tim HDInsight. Anda dapat menggunakan versi yang sama untuk kluster Anda jika versi HBase dan Spark sama seperti yang ditunjukkan dalam tabel.

  1. Dalam sesi SSH terbuka Anda ke kluster Spark, masukkan perintah berikut untuk memulai shell Spark:

    Versi Spark Versi HDI HBase Versi SHC Perintah
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Biarkan instans shell Spark ini tetap terbuka dan lanjutkan untuk Menentukan katalog dan kueri. Jika Anda tidak menemukan jar yang sesuai dengan versi Anda di repositori SHC Core, lanjutkan pembacaan.

Untuk kombinasi berikutnya dari versi Spark dan HBase, artefak ini tidak lagi dipublikasikan di repositori di atas. Anda dapat membangun jar langsung dari cabang GitHub spark-hbase-connector. Misalnya, jika Anda menjalankan dengan Spark 2.4 dan HBase 2.1, selesaikan langkah-langkah berikut:

  1. Mengklonakan repositori:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Buka cabang-2.4:

    git checkout branch-2.4
    
  3. Membangun dari cabang (membuat file .jar):

    mvn clean package -DskipTests
    
  4. Jalankan perintah berikut (pastikan untuk mengubah nama .jar yang terkait dengan file .jar yang Anda bangun):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Biarkan instans shell Spark ini tetap terbuka dan lanjutkan ke bagian berikutnya.

Menentukan katalog dan kueri

Dalam langkah ini, Anda mendefinisikan objek katalog yang memetakan skema dari Apache Spark ke Apache HBase.

  1. Di Spark Shell Anda yang terbuka, masukkan pernyataan import berikut:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Masukkan perintah di bawah ini untuk menentukan katalog tabel Kontak yang Anda buat di HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Kode:

    1. Mendefinisikan skema katalog untuk tabel HBase bernama Contacts.
    2. Mengidentifikasi rowkey sebagai key, dan petakan nama kolom yang digunakan dalam Spark ke famili kolom, nama kolom, dan jenis kolom seperti yang digunakan di HBase.
    3. Menentukan rowkey secara terperinci sebagai kolom bernama (rowkey), yang memiliki famili kolom spesifik cf dari rowkey.
  3. Masukkan perintah untuk menentukan metode yang menyediakan DataFrame di sekitar tabel Anda Contacts di HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Membuat instans DataFrame:

    val df = withCatalog(catalog)
    
  5. Kuerikan DataFrame:

    df.show()
    

    Anda akan melihat dua baris data:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Daftarkan tabel sementara agar Anda bisa mengkueri tabel HBase menggunakan Spark SQL:

    df.createTempView("contacts")
    
  7. Menerbitkan kueri SQL terhadap tabel contacts:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Anda akan melihat hasil seperti ini:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Menyisipkan data baru

  1. Untuk menyisipkan rekaman Kontak baru, tentukan kelas ContactRecord:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Buat instans ContactRecord dan masukkan ke dalam array:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Simpan array data baru ke HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Periksa hasilnya:

    df.show()
    

    Anda akan melihat output seperti ini:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Tutup shell Spark dengan memasukkan perintah berikut:

    :q
    

Langkah berikutnya