استخدام Apache Spark لقراءة وكتابة بيانات Apache HBase

عادةً ما يتم الاستعلام عن Apache HBase إما من خلال واجهة برمجة التطبيقات ذات المستوى المنخفض (عمليات المسح، والحصول على، ووضع) أو باستخدام بناء جملة SQL باستخدام Apache Phoenix. يوفر Apache أيضًا موصل Apache Spark HBase. يعد الموصل بديلًا مناسبًا وفعالًا للاستعلام عن البيانات المخزنة بواسطة HBase وتعديلها.

المتطلبات الأساسية

إجمالي العمليات

العملية عالية المستوى لتمكين مجموعة Spark الخاصة بك من الاستعلام عن مجموعة HBase الخاصة بك على النحو التالي:

  1. تحضير بعض عينات البيانات في HBase.
  2. احصل على ملف hbase-site.xml من مجلد تكوين مجموعة HBase (/ etc / hbase / conf)، ثم ضع نسخة من hbase-site.xml في مجلد تكوين Spark 2 (/ etc / spark2 / conf). (اختياري: استخدم البرنامج النصي المقدم من فريق HDInsight لأتمتة هذه العملية)
  3. قم بتشغيل spark-shell الرجوع إلى Spark HBase Connector بواسطة إحداثيات Maven الخاصة به في الخيار packages.
  4. حدد كتالوجًا يقوم بتعيين المخطط من Spark إلى HBase.
  5. التفاعل مع بيانات HBase باستخدام واجهات برمجة تطبيقات RDD أو DataFrame.

تحضير البيانات النموذجية في Apache HBase

في هذه الخطوة، تقوم بإنشاء جدول وملؤه في Apache HBase بحيث يمكنك الاستعلام بعد ذلك باستخدام Spark.

  1. استخدم الأمر ssh للاتصال بمجموعة HBase الخاصة بك. قم بتحرير الأمر عن طريق استبدال HBASECLUSTER باسم مجموعة HBase الخاصة بك، ثم أدخل الأمر :

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. استخدم الأمر hbase shell لبدء shell التفاعلي HBase. أدخل الأمر التالي في اتصال SSH:

    hbase shell
    
  3. استخدم الأمر create لإنشاء جدول HBase مع عائلات من عمودين. أدخل الأمر التالي:

    create 'Contacts', 'Personal', 'Office'
    
  4. استخدم الأمر put لإدراج القيم في عمود محدد في صف محدد في جدول معين. أدخل الأمر التالي:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. استخدم الأمر exit لإيقاف غلاف HBase التفاعلي. أدخل الأمر التالي:

    exit
    

قم بتشغيل البرامج النصية لإعداد الاتصال بين المجموعات

لإعداد الاتصال بين المجموعات، اتبع الخطوات لتشغيل برنامجين نصيين على مجموعاتك. ستقوم هذه البرامج النصية بأتمتة عملية نسخ الملفات الموضحة في قسم "إعداد الاتصال يدويا".

  • سيتم تحميل البرنامج النصي الذي تقوم بتشغيله من مجموعة HBase hbase-site.xml ومعلومات تعيين HBase IP إلى وحدة التخزين الافتراضية المرفقة بمجموعة Spark الخاصة بك.
  • يقوم البرنامج النصي الذي تقوم بتشغيله من مجموعة Spark بإعداد مهمتين cron لتشغيل برنامجين نصيين مساعدين بشكل دوري:
    1. مهمة HBase cron - تنزيل ملفات hbase-site.xml جديدة وتعيين HBase IP من حساب التخزين الافتراضي Spark إلى العقدة المحلية
    2. وظيفة Spark cron - تتحقق مما إذا كان مقياس Spark قد حدث وما إذا كانت المجموعة آمنة. إذا كان الأمر كذلك، فقم بتحرير /etc/hosts لتضمين تعيين HBase IP المخزن محليًا

ملاحظة: قبل المتابعة، تأكد من إضافة حساب تخزين مجموعة Spark إلى مجموعة HBase كحساب تخزين ثانوي. تأكد من البرامج النصية بالترتيب كما هو موضح.

  1. استخدم Script Action في مجموعة HBase لديك لتطبيق التغييرات مع الاعتبارات التالية:

    الخاصية القيمة
    عنوان URI النصي Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    نوع العقدة (العقدات) المنطقة
    المعلمات -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    الاستمرار yes
    • SECONDARYS_STORAGE_URL هو عنوان url الخاص بوحدة التخزين الافتراضية من جانب Spark. مثال على المعلمة: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. استخدم Script Action على مجموعة Spark لديك لتطبيق التغييرات مع الاعتبارات التالية:

    الخاصية القيمة
    عنوان URI النصي Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    نوع العقدة (العقدات) رئيس، عامل، Zookeeper
    المعلمات -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    الاستمرار yes
    • يمكنك تحديد عدد المرات التي تريد أن تتحقق فيها هذه المجموعة تلقائيًا من التحديث. الافتراضي: -s “* / 1 * * * *” -h 0 (في هذا المثال، يعمل Spark cron كل دقيقة، بينما لا يعمل HBase cron)
    • نظرا لأنه لم يتم إعداد HBase cron بشكل افتراضي، تحتاج إلى إعادة تشغيل هذا البرنامج النصي عند إجراء التحجيم إلى نظام مجموعة HBase الخاص بك. إذا كانت مجموعة HBase تتوسع كثيرًا، فيمكنك اختيار إعداد وظيفة HBase cron تلقائيًا. على سبيل المثال: يقوم -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" بتهيئة البرنامج النصي لإجراء عمليات تحقق كل 30 دقيقة. سيؤدي هذا إلى تشغيل جدول HBase cron بشكل دوري لأتمتة تنزيل معلومات HBase الجديدة على حساب التخزين المشترك إلى العقدة المحلية.

إشعار

تعمل هذه البرامج النصية فقط على مجموعات HDI 5.0 وHDI 5.1.

إعداد الاتصال يدويًا (اختياري، إذا فشل البرنامج النصي في الخطوة أعلاه)

NOTE: يجب تنفيذ هذه الخطوات في كل مرة تخضع فيها إحدى المجموعات لنشاط تحجيم.

  1. انسخ hbase-site.xml من التخزين المحلي إلى جذر التخزين الافتراضي لمجموعة Spark. قم بتحرير الأمر ليعكس التكوين الخاص بك. بعد ذلك، من جلسة SSH المفتوحة إلى مجموعة HBase، أدخل الأمر:

    قيمة بناء الجملة قيمة جديدة
    مخطط URI تعديل لتعكس التخزين الخاص بك. بناء الجملة هو لتخزين كائن ثنائي كبير الحجم مع تمكين النقل الآمن.
    SPARK_STORAGE_CONTAINER استبدل اسم حاوية التخزين الافتراضي المستخدم لمجموعة Spark.
    SPARK_STORAGE_ACCOUNT استبدل اسم حساب التخزين الافتراضي المستخدم لمجموعة Spark.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. ثم قم بإنهاء اتصال ssh بمجموعة HBase الخاصة بك.

    exit
    
  3. قم بالاتصال بالعقدة الرئيسية لمجموعة Spark الخاصة بك باستخدام SSH. قم بتحرير الأمر عن طريق استبدال SPARKCLUSTER باسم مجموعة Spark، ثم أدخل الأمر :

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. أدخل الأمر للنسخ hbase-site.xml من التخزين الافتراضي لنظام مجموعة Spark إلى مجلد تكوين Spark 2 على التخزين المحلي للمجموعة:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

قم بتشغيل Spark Shell بالإشارة إلى موصل Spark HBase

بعد إكمال الخطوة السابقة، يجب أن تكون قادرًا على تشغيل Spark shell، بالإشارة إلى الإصدار المناسب من Spark HBase Connector.

كمثال، يسرد الجدول التالي نسختين والأوامر المقابلة التي يستخدمها فريق HDInsight حاليًا. يمكنك استخدام نفس الإصدارات لمجموعاتك إذا كانت إصدارات HBase وSpark متطابقة كما هو موضح في الجدول.

  1. في جلسة SSH المفتوحة إلى Spark، أدخل الأمر التالي لبدء Spark shell:

    إصدار Spark إصدار HDI HBase إصدار SHC الأمر
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. اترك مثيل Spark shell مفتوحًا وتابع تعريف الكتالوج والاستعلام . إذا لم تجد jars التي تتوافق مع إصداراتك في مستودع SHC Core، فتابع القراءة.

بالنسبة للمجموعات اللاحقة من إصدارات Spark وHBase، لم يعد يتم نشر هذه القطع الأثرية في أعلى repo. يمكنك بناء الجرار مباشرة من فرع GitHub spark-hbase-connector . على سبيل المثال، إذا كنت تستخدم Spark 2.4 وHBase 2.1، فأكمل الخطوات التالية:

  1. استنساخ repo:

    git clone https://github.com/hortonworks-spark/shc
    
  2. انتقل إلى الفرع 2.4:

    git checkout branch-2.4
    
  3. بناء من الفرع (إنشاء ملف .jar):

    mvn clean package -DskipTests
    
  4. قم بتشغيل الأمر التالي (تأكد من تغيير اسم .jar الذي يتوافق مع ملف .jar الذي أنشأته):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. احتفظ بمثيل Spark shell هذا مفتوحًا وتابع إلى القسم التالي.

تحديد الكتالوج والاستعلام

في هذه الخطوة، تقوم بتعريف عنصر كتالوج يقوم بتعيين المخطط من Apache Spark إلى Apache HBase.

  1. في Spark Shell المفتوحة، أدخل عبارات import التالية:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. أدخل الأمر أدناه لتحديد كتالوج لجدول جهات الاتصال الذي أنشأته في HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    التعليمة البرمجية:

    1. يحدد مخطط كتالوج لجدول HBase المسمى Contacts.
    2. يعرّف مفتاح الصف على أنه key، وقم بتعيين أسماء الأعمدة المستخدمة في Spark لعائلة الأعمدة واسم العمود ونوع العمود كما هو مستخدم في HBase.
    3. يعرّف مفتاح الصف بالتفصيل على أنه عمود مسمى (rowkey)، والذي يحتوي على مجموعة أعمدة معينة cf من rowkey.
  3. أدخل الأمر لتعريف أسلوب يوفر DataFrame حول Contacts الجدول في HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. قم بإنشاء مثيل من DataFrame:

    val df = withCatalog(catalog)
    
  5. استعلام عن DataFrame:

    df.show()
    

    يجب أن ترى صفين من البيانات:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. قم بتسجيل جدول مؤقت حتى تتمكن من الاستعلام عن جدول HBase باستخدام Spark SQL:

    df.createTempView("contacts")
    
  7. إصدار استعلام SQL مقابل الجدول contacts:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    يجب أن ترى نتائج مثل هذه:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

أدخل بيانات جديدة

  1. لإدراج سجل جهة اتصال جديد، حدد فئة ContactRecord:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. قم بإنشاء مثيل لـContactRecord وضعه في مصفوفة:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. احفظ مجموعة البيانات الجديدة في HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. افحص النتائج:

    df.show()
    

    يجب أن ترى الإخراج مثل هذا:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. أغلق spark shell بإدخال الأمر التالي:

    :q
    

الخطوات التالية