Aracılığıyla paylaş


AKS üzerinde HDInsight üzerinde Apache Flink® ile Hive Kataloğu'nu kullanma

Önemli

Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.

Bu örnekte, Apache Flink'in Hive Kataloğu ile kalıcı katalog olarak Hive'ın Meta Veri Deposu kullanılır. Kafka tablosunu ve MySQL tablosu meta verilerini oturumlar arasında Flink'te depolamak için bu işlevi kullanırız. Flink, Kaynak olarak Hive Kataloğu'nda kayıtlı Kafka tablosunu kullanır, MySQL veritabanına bazı arama ve havuz sonuçları gerçekleştirin

Önkoşullar

Flink, Hive ile iki kat tümleştirme sunar.

  • İlk adım, Flink'e özgü meta verileri oturumlar arasında depolamak için Hive Meta Veri Deposu'nu (HMS) Flink'in HiveCatalog'u ile kalıcı bir katalog olarak kullanmaktır.
    • Örneğin, kullanıcılar HiveCatalog kullanarak Kafka veya ElasticSearch tablolarını Hive Meta Veri Deposunda depolayabilir ve daha sonra SQL sorgularında yeniden kullanabilir.
  • İkincisi, Flink'i Hive tablolarını okumak ve yazmak için alternatif bir altyapı olarak sunmaktır.
  • HiveCatalog, mevcut Hive yüklemeleriyle uyumlu "kullanıma açık" olacak şekilde tasarlanmıştır. Mevcut Hive Meta Veri Deponuzu değiştirmeniz veya tablolarınızın veri yerleşimini veya bölümlemini değiştirmeniz gerekmez.

Daha fazla bilgi için bkz. Apache Hive

Ortam hazırlığı

Azure portalında HMS ile bir Apache Flink kümesi oluşturalım. Flink kümesi oluşturma hakkında ayrıntılı yönergelere başvurabilirsiniz.

Flink kümesinin nasıl oluşturulacağını gösteren ekran görüntüsü.

Küme oluşturulduktan sonra HMS'nin AKS tarafında çalışıp çalışmadığını denetleyin.

Flink kümesinde HMS durumunu denetlemeyi gösteren ekran görüntüsü.

HDInsight'ta kullanıcı siparişi işlem verileri Kafka konusunu hazırlama

Aşağıdaki komutu kullanarak kafka istemci jar dosyasını indirin:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Tar dosyasının işaretini kaldır

tar -xvf kafka_2.12-3.2.0.tgz

kafka konusuna iletileri oluşturun.

Kafka konusuna ileti oluşturma adımlarını gösteren ekran görüntüsü.

Diğer komutlar:

Not

Bootstrap-server yerine kendi kafka aracıları ana bilgisayar adınızı veya IP'nizi kullanmanız gerekir

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Azure'da MySQL'de kullanıcı siparişi ana verilerini hazırlama

Db'nin test edilmesi:

Kafka'da veritabanının nasıl test yapılacağını gösteren ekran görüntüsü.

Portalda Cloud Shell'i çalıştırmayı gösteren ekran görüntüsü.

Sipariş tablosunu hazırlayın:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

SSH indirmesi gerekli Kafka bağlayıcısını ve My SQL Veritabanı jar'larını kullanma

Not

HDInsight kafka sürümümüze ve MySQL sürümümüze göre doğru sürüm jar dosyasını indirin.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Planner jar dosyasını taşıma

Jar flink-table-planner_2.12-1.17.0-.... jar, webssh podunun /opt to /lib dosyasında bulunur ve jar flink-table-planner-loader1.17.0-.... jar /opt/flink-webssh/opt/ from /lib. Daha fazla ayrıntı için soruna bakın. Planner jar dosyasını taşımak için aşağıdaki adımları gerçekleştirin.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Not

Ek planlayıcı jar taşıma işlemi yalnızca Hive diyalekt veya HiveServer2 uç noktası kullanılırken gereklidir. Ancak bu, Hive tümleştirmesi için önerilen kurulumdur.

Doğrulama

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Not

Hive Meta Veri Deposu ile Flink kümesini zaten kullandığımız için ek yapılandırma gerçekleştirmeye gerek yoktur.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Kafka tablosunun nasıl oluşturulacağını gösteren ekran görüntüsü.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

mysql tablosunun nasıl oluşturulacağını gösteren ekran görüntüsü.

Tablo çıkışını gösteren ekran görüntüsü.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Kullanıcı işleminin nasıl havuza alınıyor olduğunu gösteren ekran görüntüsü.

Flink kullanıcı arabirimini gösteren ekran görüntüsü.

Kafka'daki kullanıcı işlem sırası verilerinin Azure Cloud Shell'de MySQL'de ana tablo sırasına eklenip eklenmediğini denetleyin

Kullanıcı işlemini denetlemeyi gösteren ekran görüntüsü.

Kafka'da üç kullanıcı siparişi daha oluşturma

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Kafka tablo verilerini denetlemeyi gösteren ekran görüntüsü.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Siparişler tablosunun nasıl denetleneceklerini gösteren ekran görüntüsü.

Kaydın Azure Cloud Shell'de MySQL'deki sipariş tablosuna eklendiğini denetleyin product_id = 104

Sipariş tablosuna eklenen kayıtları gösteren ekran görüntüsü.

Başvuru

  • Apache Hive
  • Apache, Apache Hive, Hive, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.