Share via


HDInsight on AKS에서 Apache Flink®와 Hive 카탈로그를 사용하는 방법

Important

이 기능은 현지 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 보충 사용 약관에는 베타 또는 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 약관이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보를 참조하세요. 질문이나 기능 제안이 있는 경우 AskHDInsight에서 세부 정보와 함께 요청을 제출하고 Azure HDInsight 커뮤니티에서 더 많은 업데이트를 확인하세요.

이 예제에서는 Apache Flink의 Hive 카탈로그를 사용하여 Hive의 Metastore를 영구적 카탈로그로 사용합니다. 이 기능은 세션 간에 Flink에 Kafka 테이블 및 MySQL 테이블 메타데이터를 저장하는 데 사용합니다. Flink는 Hive 카탈로그에 등록된 Kafka 테이블을 원본으로 사용하고 일부 조회를 수행하고 결과를 MySQL 데이터베이스에 싱크합니다.

필수 조건

Flink는 Hive와의 두 단계로 이루어진 통합을 제공합니다.

  • 첫 번째 단계는 Flink의 HiveCatalog를 사용하여 세션 간에 Flink 특정 메타데이터를 저장하기 위해 HMS(Hive Metastore)를 영구적 카탈로그로 사용하는 것입니다.
    • 예를 들어 사용자는 HiveCatalog를 사용하여 Hive Metastore에 Kafka 또는 ElasticSearch 테이블을 저장하고 나중에 SQL 쿼리에서 다시 사용할 수 있습니다.
  • 두 번째 단계는 Hive 테이블을 읽고 쓰기 위한 대체 엔진으로 Flink를 제공하는 것입니다.
  • HiveCatalog는 기존 Hive 설치와 호환되어 "기본 제공"되도록 설계되었습니다. 기존 Hive Metastore를 수정하거나 테이블의 데이터 배치 또는 분할을 변경할 필요가 없습니다.

자세한 내용은 Apache Hive를 참조하세요.

환경 준비

Azure Portal에서 HMS를 사용하여 Apache Flink 클러스터를 만들어 보겠습니다. Flink 클러스터 만들기에 대한 자세한 지침을 참조할 수 있습니다.

Flink 클러스터를 만드는 방법을 보여 주는 스크린샷

클러스터를 만든 후 AKS 쪽에서 HMS가 실행 중인지 여부를 확인합니다.

Flink 클러스터에서 HMS 상태를 확인하는 방법을 보여 주는 스크린샷

HDInsight에서 사용자 순서 트랜잭션 데이터 Kafka 항목 준비

다음 명령을 사용하여 kafka 클라이언트 jar를 다운로드합니다.

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

다음을 사용하여 tar 파일을 untar합니다.

tar -xvf kafka_2.12-3.2.0.tgz

Kafka 토픽에 메시지를 생성합니다.

Kafka 토픽에 대한 메시지를 생성하는 방법을 보여 주는 스크린샷

기타 명령

참고 항목

부트스트랩 서버를 고유한 kafka 브로커 호스트 이름 또는 IP로 바꿔야 합니다.

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Azure의 MySQL에서 사용자 순서 마스터 데이터 준비

DB 테스트:

Kafka에서 데이터베이스를 테스트하는 방법을 보여 주는 스크린샷

포털에서 Cloud Shell을 실행하는 방법을 보여 주는 스크린샷

주문 테이블을 준비합니다.

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

SSH를 사용하여 필요한 Kafka 커넥터 및 MySQL 데이터베이스 jar 다운로드

참고 항목

HDInsight kafka 버전 및 MySQL 버전에 따라 올바른 버전 jar을 다운로드합니다.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Planner jar 이동

webssh Pod의 /opt에 있는 jar flink-table-planner_2.12-1.17.0-....jar를 /lib으로 이동하고 /lib에서 jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/를 꺼냅니다. 자세한 내용은 문제를 참조하세요. Planner jar를 이동하려면 다음 단계를 수행합니다.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

참고 항목

추가 Planner jar 이동은 Hive 언어 또는 HiveServer2 엔드포인트를 사용하는 경우에만 필요합니다. 그러나 이것은 Hive 통합에 권장되는 설정입니다.

유효성 검사

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

참고 항목

이미 Hive Metastore에서 Flink 클러스터를 사용하므로 추가 구성을 수행할 필요가 없습니다.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Kafka 테이블을 만드는 방법을 보여 주는 스크린샷

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

mysql 테이블을 만드는 방법을 보여 주는 스크린샷

테이블 출력을 보여 주는 스크린샷

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

사용자 트랜잭션을 싱크하는 방법을 보여 주는 스크린샷

Flink UI를 보여 주는 스크린샷

Kafka의 사용자 트랜잭션 순서 데이터가 Azure Cloud Shell의 MySQL에서 마스터 테이블 순서로 추가되었는지 확인합니다.

사용자 트랜잭션을 확인하는 방법을 보여 주는 스크린샷

Kafka에서 세 가지 사용자 순서 만들기

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Kafka 테이블 데이터를 확인하는 방법을 보여 주는 스크린샷

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

주문 테이블을 확인하는 방법을 보여 주는 스크린샷

Azure Cloud Shell의 MySQL의 순서 테이블에 product_id = 104 레코드가 추가되는지 확인

주문 테이블에 추가된 레코드를 보여 주는 스크린샷

참조