Table API وSQL في مجموعات Apache Flink® على HDInsight على AKS

هام

هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.

يتميز Apache Flink بواجهة برمجة تطبيقات ارتباطية - واجهة برمجة تطبيقات الجدول وSQL - للتدفق الموحد ومعالجة الدفعات. واجهة برمجة تطبيقات الجدول هي واجهة برمجة تطبيقات استعلام متكاملة اللغة تسمح بتكوين الاستعلامات من عوامل التشغيل الارتباطية مثل التحديد والتصفية والانضمام بشكل بديهي. يعتمد دعم SQL الخاص ب Flink على Apache Calcite، الذي ينفذ معيار SQL.

تتكامل واجهات Table API وSQL بسلاسة مع بعضها البعض وواجهة برمجة تطبيقات DataStream الخاصة ب Flink. يمكنك التبديل بسهولة بين جميع واجهات برمجة التطبيقات والمكتبات، والتي تعتمد عليها.

مثل محركات SQL الأخرى، تعمل استعلامات Flink أعلى الجداول. يختلف عن قاعدة البيانات التقليدية لأن Flink لا يدير البيانات الثابتة محليا؛ بدلا من ذلك، تعمل الاستعلامات الخاصة به بشكل مستمر عبر الجداول الخارجية.

تبدأ مسارات معالجة البيانات Flink بجداول المصدر وتنتهي بجداول المتلقي. تنتج جداول المصدر صفوفا يتم تشغيلها أثناء تنفيذ الاستعلام؛ إنها الجداول المشار إليها في عبارة FROM للاستعلام. يمكن أن تكون الاتصال ors من نوع HDInsight Kafka أو HDInsight HBase أو Azure Event Hubs أو قواعد البيانات أو أنظمة الملفات أو أي نظام آخر يقع موصله في مسار الفئة.

يمكنك الرجوع إلى هذه المقالة حول كيفية استخدام CLI من Secure Shell على مدخل Microsoft Azure. فيما يلي بعض العينات السريعة لكيفية البدء.

  • لبدء تشغيل عميل SQL

    ./bin/sql-client.sh
    
  • لتمرير ملف sql للتهيئة للتشغيل جنبا إلى جنب مع sql-client

    ./sql-client.sh -i /path/to/init_file.sql
    
  • لتعيين تكوين في sql-client

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

يدعم Flink SQL عبارات CREATE التالية

  • إنشاء جدول
  • إنشاء قاعدة بيانات
  • إنشاء كتالوج

فيما يلي مثال لبناء الجملة لتعريف جدول مصدر باستخدام موصل jdbc للاتصال ب MSSQL، مع المعرف، واسم كأعمدة في عبارة CREATE TABLE

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

إنشاء قاعدة بيانات :

CREATE DATABASE students;

إنشاء كتالوج:

CREATE CATALOG myhive WITH ('type'='hive');

يمكنك تشغيل الاستعلامات المستمرة أعلى هذه الجداول

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

اكتب إلى Sink Table من Source Table:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

إضافة تبعيات

يتم استخدام عبارات JAR لإضافة jars للمستخدم في classpath أو إزالة jars المستخدم من classpath أو إظهار jars المضافة في classpath في وقت التشغيل.

يدعم Flink SQL عبارات JAR التالية:

  • إضافة JAR
  • إظهار JARS
  • إزالة JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

توفر الكتالوجات بيانات تعريف، مثل قواعد البيانات والجداول والأقسام وطرق العرض والوظائف والمعلومات اللازمة للوصول إلى البيانات المخزنة في قاعدة بيانات أو أنظمة خارجية أخرى.

في HDInsight على AKS، ندعم Flink خيارين للكتالوج:

GenericInMemoryCatalog

GenericInMemoryCatalog هو تنفيذ في الذاكرة للكتالوج. تتوفر جميع الكائنات فقط طوال مدة جلسة sql.

HiveCatalog

يخدم HiveCatalog غرضين؛ كمخزن مستمر لبيانات تعريف Flink النقية، وكواجهة لقراءة وكتابة بيانات تعريف Hive الموجودة.

إشعار

يأتي HDInsight على مجموعات AKS مع خيار متكامل من Hive Metastore ل Apache Flink. يمكنك اختيار Hive Metastore أثناء إنشاء نظام المجموعة

يمكنك الرجوع إلى هذه المقالة حول كيفية استخدام CLI والبدء في Flink SQL Client من Secure Shell على مدخل Microsoft Azure.

  • بدء sql-client.sh جلسة العمل

    لقطة شاشة تعرض كتالوج الخلية الافتراضي.

    Default_catalog هو كتالوج الذاكرة الافتراضي

  • دعونا الآن نتحقق من قاعدة البيانات الافتراضية للكتالوج في الذاكرة لقطة شاشة تعرض كتالوجات الذاكرة الافتراضية.

  • دعونا ننشئ كتالوج Hive للإصدار 3.1.2 ونستخدمه

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    إشعار

    HDInsight على AKS يدعم Hive 3.1.2 و Hadoop 3.3.2. hive-conf-dir تم تعيين إلى الموقع/opt/hive-conf

  • دعونا ننشئ قاعدة بيانات في كتالوج الخلية ونجعلها افتراضية لجلسة العمل (ما لم يتم تغييرها). لقطة شاشة تعرض إنشاء قاعدة بيانات في كتالوج الخلية وجعلها كتالوج افتراضي لجلسة العمل.

كيفية إنشاء جداول Hive وتسجيلها في كتالوج Hive

  • اتبع الإرشادات حول كيفية إنشاء قواعد بيانات Flink وتسجيلها في الكتالوج

  • دعونا ننشئ Flink Table من نوع الموصل Hive بدون قسم

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • إدراج بيانات في hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • قراءة البيانات من hive_table

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    إشعار

    يقع دليل Hive Warehouse في حاوية حساب التخزين المعينة التي تم اختيارها أثناء إنشاء مجموعة Apache Flink، ويمكن العثور عليها في الدليل hive/warehouse/

  • يتيح إنشاء جدول Flink لنوع الموصل الخلية مع القسم

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

هام

هناك قيود معروفة في Apache Flink. يتم اختيار الأعمدة 'n' الأخيرة للأقسام، بغض النظر عن عمود القسم المحدد من قبل المستخدم. FLINK-32596 سيكون مفتاح القسم خاطئا عند استخدام لهجة Flink لإنشاء جدول Hive.

المرجع