Share via


Flink/Delta 커넥터를 사용하는 방법

Important

이 기능은 현지 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 보충 사용 약관에는 베타 또는 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 약관이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보를 참조하세요. 질문이나 기능 제안이 있는 경우 세부 정보와 함께 AskHDInsight에 요청을 제출하고 Azure HDInsight 커뮤니티에서 추가 업데이트를 보려면 팔로우하세요.

Apache Flink와 Delta Lake를 함께 사용하면 안정적이고 스케일링 가능한 데이터 레이크하우스 아키텍처를 만들 수 있습니다. Flink/Delta 커넥터를 사용하면 ACID 트랜잭션을 사용하여 정확히 한 번 처리하는 Delta 테이블에 데이터를 쓸 수 있습니다. 즉, 검사점에서 Flink 파이프라인을 다시 시작하는 경우에도 데이터 스트림이 일관되고 오류가 없습니다. Flink/Delta 커넥터는 데이터가 손실되거나 중복되지 않고 Flink 의미 체계와 일치하도록 합니다.

이 문서에서는 Flink-Delta 커넥터를 사용하는 방법에 대해 알아봅니다.

  • Delta 테이블에서 데이터를 읽습니다.
  • Delta 테이블에 데이터를 씁니다.
  • Power BI에서 쿼리합니다.

Flink/Delta 커넥터란?

Flink/Delta 커넥터는 Apache Flink 애플리케이션에서 Delta 독립 실행형 JVM 라이브러리를 활용하는 Delta 테이블로 데이터를 읽고 쓰는 JVM 라이브러리입니다. 커넥터는 정확히 한 번 전송 보장을 제공합니다.

Flink/Delta 커넥터에는 다음이 포함됩니다.

Apache Flink에서 Delta 테이블에 데이터를 쓰기 위한 DeltaSink입니다. Apache Flink를 사용하여 Delta 테이블을 읽기 위한 DeltaSource입니다.

Apache Flink-Delta 커넥터에 포함:

커넥터 버전에 따라 다음 Apache Flink 버전과 함께 사용할 수 있습니다.

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

자세한 내용은 Flink/Delta 커넥터를 참조하세요.

필수 조건

  • AKS의 HDInsight Flink 1.17.0 클러스터
  • Flink-Delta 커넥터 0.7.0
  • MSI를 사용하여 ADLS Gen2에 액세스
  • 개발용 IntelliJ

Delta 테이블에서 데이터 읽기

Delta 원본은 다음과 같이 두 가지 모드 중 하나로 작동할 수 있습니다.

  • 제한된 모드 특정 테이블 버전에 대해서만 Delta 테이블의 콘텐츠를 읽으려는 일괄 작업에 적합합니다. DeltaSource.forBoundedRowData API를 사용하여 이 모드의 원본을 만듭니다.

  • 연속 모드 새로운 변경 내용과 버전이 있는지 Delta 테이블을 지속적으로 확인하려는 스트리밍 작업에 적합합니다. DeltaSource.forContinuousRowData API를 사용하여 이 모드의 원본을 만듭니다.

예: 제한된 모드에서 모든 열을 읽기 위한 Delta 테이블의 원본 만들기. 일괄 작업에 적합합니다. 이 예에서는 최신 테이블 버전을 로드합니다.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

다른 연속 모델의 예는 데이터 원본 모드를 참조하세요.

Delta 싱크에 쓰기

현재 Delta Sink는 다음과 같은 Flink 메트릭을 공개합니다.

Flink 메트릭 테이블을 보여 주는 스크린샷.

분할되지 않은 테이블에 대한 싱크 만들기

이 예에서는 DeltaSink를 만들고 이를 기존 org.apache.flink.streaming.api.datastream.DataStream에 연결하는 방법을 보여 줍니다.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

다른 싱크 만들기 예는 데이터 싱크 메트릭을 참조하세요.

전체 코드

델타 테이블에서 데이터를 읽고 다른 델타 테이블에 싱크합니다.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. ABFS에 jar을 업로드합니다. 앱 모드 jar 파일을 보여 주는 스크린샷.

  2. AppMode 클러스터에 작업 jar 정보를 전달합니다.

    클러스터 구성을 보여 주는 스크린샷.

    참고 항목

    ADLS를 읽거나 쓰는 동안 항상 hadoop.classpath.enable을 사용하도록 설정합니다.

  3. 클러스터를 제출하면 Flink UI에서 작업을 볼 수 있습니다.

    Flink 대시보드를 보여 주는 스크린샷.

  4. ADLS에서 결과를 찾습니다.

    출력을 보여 주는 스크린샷.

Power BI 통합

데이터가 Delta 싱크에 있으면 Power BI 데스크톱에서 쿼리를 실행하고 보고서를 만들 수 있습니다.

  1. Power BI Desktop을 열어 ADLS Gen2 커넥터를 사용하여 데이터를 가져옵니다.

    Power BI Desktop을 보여 주는 스크린샷.

    ADLSGen 2 커넥터를 보여 주는 스크린샷.

  2. 스토리지 계정의 URL입니다.

    스토리지 계정의 URL을 보여 주는 스크린샷.

    ADLS Gen2 세부 정보를 보여 주는 스크린샷.

  3. 원본에 대한 M-쿼리를 만들고 스토리지 계정에서 데이터를 쿼리하는 함수를 호출합니다. Delta Power BI 커넥터를 참조하세요.

  4. 데이터를 쉽게 사용할 수 있게 되면 보고서를 만들 수 있습니다.

    보고서를 만드는 방법을 보여 주는 스크린샷.

참조