Share via


Hive Catalog gebruiken met Apache Flink® in HDInsight in AKS

Belangrijk

Deze functie is momenteel beschikbaar in preview. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews bevatten meer juridische voorwaarden die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet beschikbaar zijn in algemene beschikbaarheid. Zie Azure HDInsight op AKS Preview-informatie voor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight met de details en volgt u ons voor meer updates in de Azure HDInsight-community.

In dit voorbeeld wordt de Metastore van Hive gebruikt als een permanente catalogus met de Hive-catalogus van Apache Flink. We gebruiken deze functionaliteit voor het opslaan van kafka-tabel- en MySQL-tabelmetagegevens op Flink in sessies. Flink maakt gebruik van kafka-tabel die is geregistreerd in Hive Catalog als bron, wat opzoek- en sinkresultaten uitvoeren in mySQL-database

Vereisten

Flink biedt een tweevoudige integratie met Hive.

  • De eerste stap is het gebruik van Hive Metastore (HMS) als permanente catalogus met HiveCatalog van Flink voor het opslaan van specifieke metagegevens in sessies.
    • Gebruikers kunnen bijvoorbeeld hun Kafka- of ElasticSearch-tabellen opslaan in Hive Metastore met behulp van HiveCatalog en ze later opnieuw gebruiken in SQL-query's.
  • De tweede is het aanbieden van Flink als alternatieve engine voor het lezen en schrijven van Hive-tabellen.
  • De HiveCatalog is ontworpen om 'out-of-the-box' compatibel te zijn met bestaande Hive-installaties. U hoeft uw bestaande Hive-metastore niet te wijzigen of de plaatsing of partitionering van uw tabellen te wijzigen.

Zie Apache Hive voor meer informatie

Omgevingsvoorbereiding

Hiermee kunt u een Apache Flink-cluster maken met HMS in Azure Portal. U kunt de gedetailleerde instructies voor het maken van een Flink-cluster raadplegen.

Schermopname van het maken van een Flink-cluster.

Controleer na het maken van het cluster of HMS wordt uitgevoerd of niet aan de AKS-zijde.

Schermopname die laat zien hoe u de HMS-status controleert in het Flink-cluster.

Kafka-onderwerp over transactiegegevens van gebruikersorders voorbereiden in HDInsight

Download het kafka-client-JAR met behulp van de volgende opdracht:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Het tar-bestand opheffen met

tar -xvf kafka_2.12-3.2.0.tgz

De berichten naar het Kafka-onderwerp produceren.

Schermopname van het produceren van berichten naar kafka-onderwerp.

Andere opdrachten:

Notitie

U moet bootstrap-server vervangen door uw eigen kafka-brokershostnaam of IP-adres

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Hoofdgegevens van gebruikersorders voorbereiden op MySQL in Azure

Database testen:

Schermopname van het testen van de database in Kafka.

Schermopname van het uitvoeren van Cloud Shell in de portal.

Bereid de ordertabel voor:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Vereiste Kafka-connector en MySQL Database-JAR's voor SSH downloaden

Notitie

Download de juiste versie-JAR op basis van onze HDInsight kafka-versie en MySQL-versie.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

De planner-jar verplaatsen

Verplaats het jar flink-table-planner_2.12-1.17.0-... jar bevindt zich in webssh pod's /opt to /lib en verplaats de jar flink-table-planner-loader1.17.0-... jar /opt/flink-webssh/opt/ van /lib. Raadpleeg het probleem voor meer informatie. Voer de volgende stappen uit om de planner-JAR te verplaatsen.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Notitie

Een extra planner jar-verplaatsing is alleen nodig bij het gebruik van hive dialect of HiveServer2-eindpunt. Dit is echter de aanbevolen installatie voor Hive-integratie.

Validatie

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Notitie

Omdat we al flink cluster gebruiken met Hive Metastore, hoeft u geen extra configuraties uit te voeren.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Schermopname die laat zien hoe u een Kafka-tabel maakt.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Schermopname die laat zien hoe u mysql-tabel maakt.

Schermopname van tabeluitvoer.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Schermopname van het sinken van gebruikerstransactie.

Schermopname van Flink UI.

Controleer of gegevens van transactieorders voor gebruikers in Kafka zijn toegevoegd in de hoofdtabelvolgorde in MySQL in Azure Cloud Shell

Schermopname die laat zien hoe u de transactie van de gebruiker controleert.

Nog drie gebruikersorders maken in Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Schermopname van het controleren van Kafka-tabelgegevens.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Schermopname die laat zien hoe u de tabel Orders controleert.

Controleer of product_id = 104 de record is toegevoegd in de volgordetabel in MySQL in Azure Cloud Shell

Schermopname van de records die zijn toegevoegd aan de ordertabel.

Verwijzing