AKS üzerinde HDInsight üzerinde Apache Flink® ile Hive Kataloğu'nu kullanma
Önemli
Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.
Bu örnekte, Apache Flink'in Hive Kataloğu ile kalıcı katalog olarak Hive'ın Meta Veri Deposu kullanılır. Kafka tablosunu ve MySQL tablosu meta verilerini oturumlar arasında Flink'te depolamak için bu işlevi kullanırız. Flink, Kaynak olarak Hive Kataloğu'nda kayıtlı Kafka tablosunu kullanır, MySQL veritabanına bazı arama ve havuz sonuçları gerçekleştirin
Önkoşullar
- Hive Meta Veri Deposu 3.1.2 ile AKS üzerinde HDInsight üzerinde Apache Flink Kümesi
- HDInsight üzerinde Apache Kafka kümesi
- Kafka kullanma konusunda açıklandığı gibi ağ ayarlarının tamamlandığından emin olmanız gerekir; aks ve HDInsight kümelerinde HDInsight'ın aynı sanal ağda olduğundan emin olmak için
- MySQL 8.0.33
Apache Flink üzerinde Apache Hive
Flink, Hive ile iki kat tümleştirme sunar.
- İlk adım, Flink'e özgü meta verileri oturumlar arasında depolamak için Hive Meta Veri Deposu'nu (HMS) Flink'in HiveCatalog'u ile kalıcı bir katalog olarak kullanmaktır.
- Örneğin, kullanıcılar HiveCatalog kullanarak Kafka veya ElasticSearch tablolarını Hive Meta Veri Deposunda depolayabilir ve daha sonra SQL sorgularında yeniden kullanabilir.
- İkincisi, Flink'i Hive tablolarını okumak ve yazmak için alternatif bir altyapı olarak sunmaktır.
- HiveCatalog, mevcut Hive yüklemeleriyle uyumlu "kullanıma açık" olacak şekilde tasarlanmıştır. Mevcut Hive Meta Veri Deponuzu değiştirmeniz veya tablolarınızın veri yerleşimini veya bölümlemini değiştirmeniz gerekmez.
Daha fazla bilgi için bkz. Apache Hive
Ortam hazırlığı
HMS ile Apache Flink kümesi oluşturma
Azure portalında HMS ile bir Apache Flink kümesi oluşturalım. Flink kümesi oluşturma hakkında ayrıntılı yönergelere başvurabilirsiniz.
Küme oluşturulduktan sonra HMS'nin AKS tarafında çalışıp çalışmadığını denetleyin.
HDInsight'ta kullanıcı siparişi işlem verileri Kafka konusunu hazırlama
Aşağıdaki komutu kullanarak kafka istemci jar dosyasını indirin:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Tar dosyasının işaretini kaldır
tar -xvf kafka_2.12-3.2.0.tgz
kafka konusuna iletileri oluşturun.
Diğer komutlar:
Not
Bootstrap-server yerine kendi kafka aracıları ana bilgisayar adınızı veya IP'nizi kullanmanız gerekir
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Azure'da MySQL'de kullanıcı siparişi ana verilerini hazırlama
Db'nin test edilmesi:
Sipariş tablosunu hazırlayın:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
SSH indirmesi gerekli Kafka bağlayıcısını ve My SQL Veritabanı jar'larını kullanma
Not
HDInsight kafka sürümümüze ve MySQL sürümümüze göre doğru sürüm jar dosyasını indirin.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Planner jar dosyasını taşıma
Jar flink-table-planner_2.12-1.17.0-.... jar, webssh podunun /opt to /lib dosyasında bulunur ve jar flink-table-planner-loader1.17.0-.... jar /opt/flink-webssh/opt/ from /lib. Daha fazla ayrıntı için soruna bakın. Planner jar dosyasını taşımak için aşağıdaki adımları gerçekleştirin.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Not
Ek planlayıcı jar taşıma işlemi yalnızca Hive diyalekt veya HiveServer2 uç noktası kullanılırken gereklidir. Ancak bu, Hive tümleştirmesi için önerilen kurulumdur.
Doğrulama
Flink SQL'e bağlanmak için bölme/sql-client.sh kullanma
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Hive kataloğu oluşturma ve Flink SQL'de hive kataloğuna bağlanma
Not
Hive Meta Veri Deposu ile Flink kümesini zaten kullandığımız için ek yapılandırma gerçekleştirmeye gerek yoktur.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Apache Flink SQL'de Kafka Tablosu Oluşturma
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Apache Flink SQL'de MySQL Tablosu Oluşturma
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Flink SQL'de Hive kataloğunun üzerinde kayıtlı tabloları denetleme
Flink SQL üzerinde MySQL'de ana sipariş tablosuna kullanıcı işlem siparişi bilgilerini havuza alma
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Kafka'daki kullanıcı işlem sırası verilerinin Azure Cloud Shell'de MySQL'de ana tablo sırasına eklenip eklenmediğini denetleyin
Kafka'da üç kullanıcı siparişi daha oluşturma
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL'de Kafka tablo verilerini denetleme
Flink SQL> select * from kafka_user_orders;
Flink SQL'de MySQL'de orders tablosuna ekleme product_id=104
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Kaydın Azure Cloud Shell'de MySQL'deki sipariş tablosuna eklendiğini denetleyin product_id = 104
Başvuru
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin