다음을 통해 공유


HDInsight on AKS에 있는 Apache Flink® 클러스터의 Table API 및 SQL

Important

이 기능은 현지 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 보충 사용 약관에는 베타 또는 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 약관이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보를 참조하세요. 질문이나 기능 제안이 있는 경우 세부 정보와 함께 AskHDInsight에 요청을 제출하고 Azure HDInsight 커뮤니티에서 추가 업데이트를 보려면 팔로우하세요.

Apache Flink는 통합 스트림 및 배치 처리를 위한 두 가지 관계형 API인 Table API와 SQL을 제공합니다. Table API는 선택, 필터링, 조인과 같은 관계형 연산자의 쿼리를 직관적으로 구성할 수 있는 언어 통합 쿼리 API입니다. Flink의 SQL 지원은 SQL 표준을 구현하는 Apache Calcite를 기반으로 합니다.

Table API와 SQL 인터페이스는 서로 원활하게 통합되며 Flink의 DataStream API와도 통합됩니다. 모든 API와 이를 기반으로 빌드된 라이브러리 간에 쉽게 전환할 수 있습니다.

다른 SQL 엔진과 마찬가지로 Flink 쿼리는 테이블 위에서 작동합니다. Flink는 미사용 데이터를 로컬에서 관리하지 않고 외부 테이블을 통해 지속적으로 쿼리를 수행한다는 점에서 기존 데이터베이스와 다릅니다.

Flink 데이터 처리 파이프라인은 원본 테이블로 시작하고 싱크 테이블로 끝납니다. 원본 테이블은 쿼리 실행 중에 연산되는 행을 생성하며 쿼리의 FROM 절에서 참조되는 테이블입니다. 커넥터는 HDInsight Kafka, HDInsight HBase, Azure Event Hubs, 데이터베이스, 파일 시스템 또는 커넥터가 클래스 경로에 있는 기타 모든 시스템 유형일 수 있습니다.

Azure Portal의 Secure Shell에서 CLI를 사용하는 방법에 대해서는 이 문서를 참조할 수 있습니다. 다음은 시작하는 방법에 대한 몇 가지 간단한 샘플입니다.

  • SQL 클라이언트를 시작하려면

    ./bin/sql-client.sh
    
  • sql-client와 함께 실행할 초기화 sql 파일을 전달하려면

    ./sql-client.sh -i /path/to/init_file.sql
    
  • sql-client에서 구성을 설정하려면

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL은 다음 CREATE 문을 지원합니다.

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE CATALOG

다음은 jdbc 커넥터를 사용하여 CREATE TABLE 문에서 ID, 이름을 열로 사용하여 MSSQL에 연결하는 원본 테이블을 정의하는 구문의 예입니다.

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://server-name.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

데이터베이스 만들기:

CREATE DATABASE students;

카탈로그 만들기:

CREATE CATALOG myhive WITH ('type'='hive');

다음 테이블 위에서 연속 쿼리를 실행할 수 있습니다.

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

원본 테이블에서 싱크 테이블로 쓰기:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

종속성 추가

JAR 문은 클래스 경로에 사용자 jar를 추가하거나 클래스 경로에서 사용자 jar를 제거하거나 런타임에 클래스 경로에 추가된 jar를 표시하는 데 사용됩니다.

Flink SQL은 다음 JAR 문을 지원합니다.

  • ADD JAR
  • JAR 표시
  • JAR 제거
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

카탈로그는 데이터베이스, 테이블, 파티션, 보기, 함수 등의 메타데이터와 데이터베이스 또는 기타 외부 시스템에 저장된 데이터에 액세스하는 데 필요한 정보를 제공합니다.

HDInsight on AKS에서 Flink는 다음 두 가지 카탈로그 옵션을 지원합니다.

GenericInMemoryCatalog

GenericInMemoryCatalog 는 카탈로그의 메모리 내 구현입니다. 모든 개체는 sql 세션의 수명 동안만 사용할 수 있습니다.

HiveCatalog

HiveCatalog는 순수 Flink 메타데이터에 대한 영구 스토리지와 기존 Hive 메타데이터를 읽고 쓰기 위한 인터페이스의 두 가지 용도로 사용됩니다.

참고 항목

AKS 클러스터의 HDInsight는 Apache Flink용 Hive 메타스토어의 통합 옵션과 함께 제공됩니다. 클러스터를 만드는 동안 Hive 메타스토어를 선택할 수 있습니다.

이 문서를 참조하여 Azure 포털의 Secure Shell에서 CLI를 사용하고 Flink SQL Client를 시작하는 방법을 알아보세요.

  • 세션 sql-client.sh 시작

    기본 하이브 카탈로그를 보여 주는 스크린샷.

    Default_catalog는 기본 메모리 내 카탈로그입니다.

  • 이제 메모리 내 카탈로그 기본 메모리 내 카탈로그를 보여 주는 스크린샷.의 기본 데이터베이스를 확인해 보겠습니다.

  • 버전 3.1.2의 Hive 카탈로그를 만들고 사용하겠습니다.

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    참고 항목

    HDInsight on AKS는 Hive 3.1.2Hadoop 3.3.2를 지원합니다. hive-conf-dir/opt/hive-conf 위치로 설정되었습니다.

  • 하이브 카탈로그에 데이터베이스를 만들고 변경하지 않는 한 세션의 기본값이 되도록 설정해 보겠습니다. 하이브 카탈로그에 데이터베이스를 만들고 이를 세션의 기본 카탈로그로 만드는 스크린샷.

Hive 카탈로그에 Hive 테이블을 만들고 등록하는 방법

  • Flink 데이터베이스를 만들고 카탈로그에 등록하는 방법의 지침을 따르세요.

  • 파티션이 없는 커넥터 유형 Hive의 Flink 테이블을 만들어 보겠습니다.

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • hive_table에 데이터 삽입

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • hive_table의 데이터 읽기

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    참고 항목

    Hive Warehouse 디렉터리는 Apache Flink 클러스터를 만드는 동안 선택한 스토리지 계정의 지정된 컨테이너에 있으며 디렉터리 hive/warehouse/에서 찾을 수 있습니다.

  • 파티션을 사용하여 커넥터 형식 하이브의 Flink 테이블을 만들 수 있습니다.

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Important

Apache Flink에는 알려진 제한 사항이 있습니다. 마지막 ‘n’ 열은 사용자 지정 파티션 열에 관계없이 파티션에 대해 선택됩니다. FLINK-32596 Flink 언어를 사용하여 Hive 테이블을 만들 때 파티션 키가 잘못되었습니다.

참조