Aracılığıyla paylaş


Flink/Delta Bağlan or kullanma

Önemli

Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.

Apache Flink ve Delta Lake'i birlikte kullanarak güvenilir ve ölçeklenebilir bir data lakehouse mimarisi oluşturabilirsiniz. Flink/Delta Bağlan or, ACID işlemleri ve tam olarak bir kez işleme ile Delta tablolarına veri yazmanızı sağlar. Bu, Flink işlem hattınızı bir denetim noktasından yeniden başlatsanız bile veri akışlarınızın tutarlı ve hatasız olduğu anlamına gelir. Flink/Delta Bağlan or, verilerinizin kaybolmamasını veya çoğaltılmamasını ve Flink semantiğiyle eşleşmesini sağlar.

Bu makalede Flink-Delta bağlayıcısını kullanmayı öğreneceksiniz.

  • Delta tablosundaki verileri okuyun.
  • Verileri bir delta tablosuna yazın.
  • Power BI'da sorgulayın.

Flink/Delta bağlayıcısı nedir?

Flink/Delta Bağlan or, Tek Başına Delta JVM kitaplığını kullanarak Apache Flink uygulamalarından Delta tablolarına veri okumak ve yazmak için bir JVM kitaplığıdır. Bağlayıcı, tam olarak bir kez teslim garantisi sağlar.

Flink/Delta Bağlan or şunları içerir:

Apache Flink'ten Delta tablosuna veri yazmak için DeltaSink. Apache Flink kullanarak Delta tablolarını okumak için DeltaSource.

Apache Flink-Delta Bağlan or şunları içerir:

Bağlayıcının sürümüne bağlı olarak, bunu aşağıdaki Apache Flink sürümleriyle kullanabilirsiniz:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Daha fazla bilgi için bkz. Flink/Delta Bağlan or.

Önkoşullar

  • AKS üzerinde HDInsight Flink 1.17.0 kümesi
  • Flink-Delta Bağlan veya 0.7.0
  • ADLS 2. Nesil'e erişmek için MSI kullanma
  • Geliştirme için IntelliJ

Delta tablosundan veri okuma

Delta Kaynağı, aşağıdaki gibi açıklanan iki moddan birinde çalışabilir.

  • Sınırlanmış Mod Yalnızca belirli bir tablo sürümü için Delta tablosunun içeriğini okumak istediğimiz toplu işler için uygundur. DeltaSource.forBoundedRowData API'sini kullanarak bu modun kaynağını oluşturun.

  • Sürekli Mod Yeni değişiklikler ve sürümler için Delta tablosunu sürekli denetlemek istediğimiz akış işleri için uygundur. DeltaSource.forContinuousRowData API'sini kullanarak bu modun kaynağını oluşturun.

Örnek: Sınırlanmış modda tüm sütunları okumak için Delta tablosu için kaynak oluşturma. Toplu işler için uygundur. Bu örnek en son tablo sürümünü yükler.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Diğer sürekli model örneği için bkz . Veri Kaynağı Modları.

Delta havuzuna yazma

Delta Havuzu şu anda aşağıdaki Flink ölçümlerini kullanıma sunar:

Flink ölçümleri tablosunu gösteren ekran görüntüsü.

Bölümlenmemiş tablolar için havuz oluşturma

Bu örnekte DeltaSink oluşturma ve var olan org.apache.flink.streaming.api.datastream.DataStreambir öğesine bağlama gösterilmektedir.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Diğer Havuz oluşturma örneği için bkz . Veri Havuzu Ölçümleri.

Tam kod

Delta tablosundaki ve havuzdaki verileri başka bir delta tablosuna okuma.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Jar dosyasını ABFS'ye yükleyin. Uygulama modu jar dosyalarını gösteren ekran görüntüsü.

  2. AppMode kümesinde iş jar bilgilerini geçirin.

    Küme yapılandırmasını gösteren ekran görüntüsü.

    Not

    ADLS'de okurken/yazarken her zaman etkinleştirin hadoop.classpath.enable .

  3. Kümeyi gönderin, işi Flink kullanıcı arabiriminde görebilmeniz gerekir.

    Flink panosunu gösteren ekran görüntüsü.

  4. ADLS'de Sonuçları Bul.

    Çıkışı gösteren ekran görüntüsü.

Power BI tümleştirmesi

Veriler delta havuzuna girdikten sonra sorguyu Power BI desktop'ta çalıştırabilir ve bir rapor oluşturabilirsiniz.

  1. ADLS 2. Nesil bağlayıcısını kullanarak verileri almak için Power BI desktop'ı açın.

    Power BI desktop'ın ekran görüntüsü.

    ADLSGen 2 bağlayıcısı gösteren ekran görüntüsü.

  2. Depolama hesabının URL'si.

    Depolama hesabının URL'sini gösteren ekran görüntüsü.

    ADLS 2. Nesil ayrıntılarını gösteren ekran görüntüsü.

  3. Kaynak için M sorgusu oluşturun ve depolama hesabından verileri sorgulayan işlevini çağırın. Delta Power BI bağlayıcılarına bakın.

  4. Veriler kullanıma sunulduktan sonra rapor oluşturabilirsiniz.

    Raporların nasıl oluşturulacağını gösteren ekran görüntüsü.

Başvurular