كيفية استخدام كتالوج Hive مع Apache Flink® على HDInsight على AKS
هام
هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.
يستخدم هذا المثال Metastore الخاص ب Hive ككتالوج مستمر مع كتالوج Apache Flink ل Hive. نستخدم هذه الوظيفة لتخزين جدول Kafka وبيانات تعريف جدول MySQL على Flink عبر الجلسات. يستخدم Flink جدول Kafka المسجل في كتالوج Hive كمصدر، وإجراء بعض نتائج البحث والمتلقي إلى قاعدة بيانات MySQL
المتطلبات الأساسية
- Apache Flink Cluster على HDInsight على AKS مع Hive Metastore 3.1.2
- مجموعة Apache Kafka على HDInsight
- مطلوب منك التأكد من اكتمال إعدادات الشبكة كما هو موضح في استخدام Kafka؛ هذا للتأكد من أن HDInsight على مجموعات AKS وHDInsight موجودة في نفس الشبكة الظاهرية
- MySQL 8.0.33
Apache Hive على Apache Flink
يوفر Flink تكاملا مزدوجا مع Hive.
- الخطوة الأولى هي استخدام Hive Metastore (HMS) ككتالوج مستمر مع HiveCatalog الخاص ب Flink لتخزين بيانات تعريف Flink المحددة عبر الجلسات.
- على سبيل المثال، يمكن للمستخدمين تخزين جداول Kafka أو ElasticSearch في Hive Metastore باستخدام HiveCatalog وإعادة استخدامها لاحقا في استعلامات SQL.
- والثاني هو تقديم Flink كمحرك بديل لقراءة وكتابة جداول Hive.
- تم تصميم HiveCatalog ليكون "خارج الصندوق" متوافقا مع عمليات تثبيت Hive الموجودة. لا تحتاج إلى تعديل Hive Metastore الحالي أو تغيير موضع البيانات أو تقسيم الجداول الخاصة بك.
لمزيد من المعلومات، راجع Apache Hive
إعداد البيئة
إنشاء مجموعة Apache Flink باستخدام HMS
يتيح إنشاء مجموعة Apache Flink مع HMS على مدخل Microsoft Azure، يمكنك الرجوع إلى الإرشادات التفصيلية حول إنشاء مجموعة Flink.
بعد إنشاء نظام المجموعة، تحقق من تشغيل HMS أو عدم تشغيله على جانب AKS.
إعداد موضوع Kafka لبيانات معاملات أمر المستخدم على HDInsight
قم بتنزيل kafka client jar باستخدام الأمر التالي:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
قم بإلغاء تحديد ملف tar باستخدام
tar -xvf kafka_2.12-3.2.0.tgz
إنتاج الرسائل إلى موضوع Kafka.
أوامر أخرى:
إشعار
مطلوب منك استبدال bootstrap-server باسم مضيف وسطاء kafka الخاصين بك أو IP
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
إعداد البيانات الرئيسية لطلب المستخدم على MySQL على Azure
اختبار DB:
إعداد جدول الطلبات:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
استخدام تنزيل SSH مطلوب موصل Kafka وجرات قاعدة بيانات MySQL
إشعار
قم بتنزيل جرة الإصدار الصحيح وفقا لإصدار HDInsight kafka وإصدار MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
نقل جرة المخطط
انقل jar flink-table-planner_2.12-1.17.0-... jar الموجود في webssh pod's /opt to /lib وانقل jar flink-table-planner-loader1.17.0-.... jar /opt/flink-webssh/opt/ من /lib. راجع المشكلة لمزيد من التفاصيل. نفذ الخطوات التالية لنقل جرة المخطط.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
إشعار
لا يلزم نقل جرة مخطط إضافية إلا عند استخدام لهجة Hive أو نقطة نهاية HiveServer2. ومع ذلك، هذا هو الإعداد الموصى به لتكامل Hive.
التحقق من الصحة
استخدام bin/sql-client.sh للاتصال ب Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
إنشاء كتالوج Apache Hive والاتصال بكتالوج الخلية على Flink SQL
إشعار
نظرا لأننا نستخدم بالفعل مجموعة Flink مع Hive Metastore، ليست هناك حاجة لإجراء أي تكوينات إضافية.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
إنشاء جدول Kafka على Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
إنشاء جدول MySQL على Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
التحقق من الجداول المسجلة في كتالوج Hive أعلاه على Flink SQL
مصدر معلومات أمر معاملات المستخدم في جدول الطلب الرئيسي في MySQL على Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
تحقق مما إذا تمت إضافة بيانات طلب معاملة المستخدم على Kafka في ترتيب الجدول الرئيسي في MySQL على Azure Cloud Shell
إنشاء ثلاثة أوامر مستخدم أخرى على Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
تحقق من بيانات جدول Kafka على Flink SQL
Flink SQL> select * from kafka_user_orders;
إدراج product_id=104
في جدول الطلبات على MySQL على Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
تتم إضافة سجل التحقق product_id = 104
في جدول الطلب على MySQL على Azure Cloud Shell
المرجع
- Apache Hive
- Apache وApache Hive وHive وApache Flink وFlink وأسماء مشاريع مصدر مفتوح المرتبطة هي علامات تجارية ل Apache Software Foundation (ASF).