Share via


Apache Flink® DataStream API를 사용하여 Azure Data Lake Storage Gen2에 이벤트 메시지 쓰기

Important

이 기능은 현지 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 보충 사용 약관에는 베타 또는 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 약관이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보를 참조하세요. 질문이나 기능 제안이 있는 경우 세부 정보와 함께 AskHDInsight에 요청을 제출하고 Azure HDInsight 커뮤니티에서 추가 업데이트를 보려면 팔로우하세요.

Apache Flink는 파일 시스템을 사용하여 애플리케이션의 결과와 내결함성 및 복구를 위해 데이터를 사용하고 영구적으로 저장합니다. 이 문서에서는 DataStream API를 사용하여 Azure Data Lake Storage Gen2에 이벤트 메시지를 작성하는 방법을 알아봅니다.

필수 조건

이 파일 시스템 커넥터는 BATCH 및 STREAMING 모두에 대해 동일한 보장을 제공하며 STREAMING 실행에 정확히 한 번 의미 체계를 제공하도록 설계되었습니다. 자세한 내용은 Flink DataStream Filesystem을 참조하세요.

Apache Kafka 커넥터

Flink는 정확히 한 번의 보장으로 Kafka 토픽에서 데이터를 읽고 Kafka 토픽에 쓸 수 있는 Apache Kafka 커넥터를 제공합니다. 자세한 내용은 Apache Kafka 커넥터를 참조하세요.

IntelliJ IDEA의 pom.xml

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

ADLS Gen2 싱크용 프로그램

abfsGen2.java

참고 항목

HDInsight 클러스터의 Apache Kafka bootStrapServers를 Kafka 3.2용 자체 브로커로 바꿉니다.

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

패키지 jar이며 Apache Flink에 제출합니다.

  1. ABFS에 jar을 업로드합니다.

    Flink 앱 모드 화면을 보여 주는 스크린샷

  2. AppMode 클러스터 만들기에서 작업 jar 정보를 전달합니다.

    앱 만들기 모드를 보여 주는 스크린샷

    참고 항목

    classloader.resolve-order를 parent-first’로 ‘추가하고 hadoop.classpath.enable을 true로 추가하세요.

  3. 작업 로그 집계를 선택하여 작업 로그를 스토리지 계정에 푸시합니다.

    작업 로그를 사용하도록 설정하는 방법을 보여 주는 스크린샷

  4. 실행 중인 작업을 볼 수 있습니다.

    Flink UI를 보여 주는 스크린샷

ADLS Gen2에서 스트리밍 데이터 유효성 검사

ADLS Gen2로 click_events 스트리밍되는 것이 표시됩니다.

ADLS Gen2 출력을 보여 주는 스크린샷Flink 클릭 이벤트 출력을 보여 주는 스크린샷

다음 세 가지 조건 중에서 진행 중인 부분 파일을 롤아웃하는 롤링 정책을 지정할 수 있습니다.

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

참조