كيفية استخدام كتالوج Hive مع Apache Flink® على HDInsight على AKS

هام

هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.

يستخدم هذا المثال Metastore الخاص ب Hive ككتالوج مستمر مع كتالوج Apache Flink ل Hive. نستخدم هذه الوظيفة لتخزين جدول Kafka وبيانات تعريف جدول MySQL على Flink عبر الجلسات. يستخدم Flink جدول Kafka المسجل في كتالوج Hive كمصدر، وإجراء بعض نتائج البحث والمتلقي إلى قاعدة بيانات MySQL

المتطلبات الأساسية

يوفر Flink تكاملا مزدوجا مع Hive.

  • الخطوة الأولى هي استخدام Hive Metastore (HMS) ككتالوج مستمر مع HiveCatalog الخاص ب Flink لتخزين بيانات تعريف Flink المحددة عبر الجلسات.
    • على سبيل المثال، يمكن للمستخدمين تخزين جداول Kafka أو ElasticSearch في Hive Metastore باستخدام HiveCatalog وإعادة استخدامها لاحقا في استعلامات SQL.
  • والثاني هو تقديم Flink كمحرك بديل لقراءة وكتابة جداول Hive.
  • تم تصميم HiveCatalog ليكون "خارج الصندوق" متوافقا مع عمليات تثبيت Hive الموجودة. لا تحتاج إلى تعديل Hive Metastore الحالي أو تغيير موضع البيانات أو تقسيم الجداول الخاصة بك.

لمزيد من المعلومات، راجع Apache Hive

إعداد البيئة

يتيح إنشاء مجموعة Apache Flink مع HMS على مدخل Microsoft Azure، يمكنك الرجوع إلى الإرشادات التفصيلية حول إنشاء مجموعة Flink.

لقطة شاشة توضح كيفية إنشاء نظام مجموعة Flink.

بعد إنشاء نظام المجموعة، تحقق من تشغيل HMS أو عدم تشغيله على جانب AKS.

لقطة شاشة توضح كيفية التحقق من حالة HMS في نظام مجموعة Flink.

إعداد موضوع Kafka لبيانات معاملات أمر المستخدم على HDInsight

قم بتنزيل kafka client jar باستخدام الأمر التالي:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

قم بإلغاء تحديد ملف tar باستخدام

tar -xvf kafka_2.12-3.2.0.tgz

إنتاج الرسائل إلى موضوع Kafka.

لقطة شاشة توضح كيفية إنتاج رسائل إلى موضوع Kafka.

أوامر أخرى:

إشعار

مطلوب منك استبدال bootstrap-server باسم مضيف وسطاء kafka الخاصين بك أو IP

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

إعداد البيانات الرئيسية لطلب المستخدم على MySQL على Azure

اختبار DB:

لقطة شاشة توضح كيفية اختبار قاعدة البيانات في Kafka.

لقطة شاشة توضح كيفية تشغيل Cloud Shell على المدخل.

إعداد جدول الطلبات:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

استخدام تنزيل SSH مطلوب موصل Kafka وجرات قاعدة بيانات MySQL

إشعار

قم بتنزيل جرة الإصدار الصحيح وفقا لإصدار HDInsight kafka وإصدار MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

نقل جرة المخطط

انقل jar flink-table-planner_2.12-1.17.0-... jar الموجود في webssh pod's /opt to /lib وانقل jar flink-table-planner-loader1.17.0-.... jar /opt/flink-webssh/opt/ من /lib. راجع المشكلة لمزيد من التفاصيل. نفذ الخطوات التالية لنقل جرة المخطط.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

إشعار

لا يلزم نقل جرة مخطط إضافية إلا عند استخدام لهجة Hive أو نقطة نهاية HiveServer2. ومع ذلك، هذا هو الإعداد الموصى به لتكامل Hive.

التحقق من الصحة

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

إشعار

نظرا لأننا نستخدم بالفعل مجموعة Flink مع Hive Metastore، ليست هناك حاجة لإجراء أي تكوينات إضافية.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

لقطة شاشة توضح كيفية إنشاء جدول Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

لقطة شاشة توضح كيفية إنشاء جدول mysql.

لقطة شاشة تعرض إخراج الجدول.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

لقطة شاشة توضح كيفية تلقي معاملة المستخدم.

لقطة شاشة تعرض واجهة مستخدم Flink.

تحقق مما إذا تمت إضافة بيانات طلب معاملة المستخدم على Kafka في ترتيب الجدول الرئيسي في MySQL على Azure Cloud Shell

لقطة شاشة توضح كيفية التحقق من معاملة المستخدم.

إنشاء ثلاثة أوامر مستخدم أخرى على Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

لقطة شاشة توضح كيفية التحقق من بيانات جدول Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

لقطة شاشة توضح كيفية التحقق من جدول الطلبات.

تتم إضافة سجل التحقق product_id = 104 في جدول الطلب على MySQL على Azure Cloud Shell

لقطة شاشة تعرض السجلات المضافة إلى جدول الطلب.

المرجع