Flink/Delta Bağlan or kullanma
Önemli
Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.
Apache Flink ve Delta Lake'i birlikte kullanarak güvenilir ve ölçeklenebilir bir data lakehouse mimarisi oluşturabilirsiniz. Flink/Delta Bağlan or, ACID işlemleri ve tam olarak bir kez işleme ile Delta tablolarına veri yazmanızı sağlar. Bu, Flink işlem hattınızı bir denetim noktasından yeniden başlatsanız bile veri akışlarınızın tutarlı ve hatasız olduğu anlamına gelir. Flink/Delta Bağlan or, verilerinizin kaybolmamasını veya çoğaltılmamasını ve Flink semantiğiyle eşleşmesini sağlar.
Bu makalede Flink-Delta bağlayıcısını kullanmayı öğreneceksiniz.
- Delta tablosundaki verileri okuyun.
- Verileri bir delta tablosuna yazın.
- Power BI'da sorgulayın.
Flink/Delta bağlayıcısı nedir?
Flink/Delta Bağlan or, Tek Başına Delta JVM kitaplığını kullanarak Apache Flink uygulamalarından Delta tablolarına veri okumak ve yazmak için bir JVM kitaplığıdır. Bağlayıcı, tam olarak bir kez teslim garantisi sağlar.
Flink/Delta Bağlan or şunları içerir:
Apache Flink'ten Delta tablosuna veri yazmak için DeltaSink. Apache Flink kullanarak Delta tablolarını okumak için DeltaSource.
Apache Flink-Delta Bağlan or şunları içerir:
Bağlayıcının sürümüne bağlı olarak, bunu aşağıdaki Apache Flink sürümleriyle kullanabilirsiniz:
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
Daha fazla bilgi için bkz. Flink/Delta Bağlan or.
Önkoşullar
- AKS üzerinde HDInsight Flink 1.17.0 kümesi
- Flink-Delta Bağlan veya 0.7.0
- ADLS 2. Nesil'e erişmek için MSI kullanma
- Geliştirme için IntelliJ
Delta tablosundan veri okuma
Delta Kaynağı, aşağıdaki gibi açıklanan iki moddan birinde çalışabilir.
Sınırlanmış Mod Yalnızca belirli bir tablo sürümü için Delta tablosunun içeriğini okumak istediğimiz toplu işler için uygundur. DeltaSource.forBoundedRowData API'sini kullanarak bu modun kaynağını oluşturun.
Sürekli Mod Yeni değişiklikler ve sürümler için Delta tablosunu sürekli denetlemek istediğimiz akış işleri için uygundur. DeltaSource.forContinuousRowData API'sini kullanarak bu modun kaynağını oluşturun.
Örnek: Sınırlanmış modda tüm sütunları okumak için Delta tablosu için kaynak oluşturma. Toplu işler için uygundur. Bu örnek en son tablo sürümünü yükler.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Diğer sürekli model örneği için bkz . Veri Kaynağı Modları.
Delta havuzuna yazma
Delta Havuzu şu anda aşağıdaki Flink ölçümlerini kullanıma sunar:
Bölümlenmemiş tablolar için havuz oluşturma
Bu örnekte DeltaSink oluşturma ve var olan org.apache.flink.streaming.api.datastream.DataStream
bir öğesine bağlama gösterilmektedir.
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Diğer Havuz oluşturma örneği için bkz . Veri Havuzu Ölçümleri.
Tam kod
Delta tablosundaki ve havuzdaki verileri başka bir delta tablosuna okuma.
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Jar dosyasını paketleyin ve çalıştırmak için Flink kümesine gönderin
AppMode kümesinde iş jar bilgilerini geçirin.
Not
ADLS'de okurken/yazarken her zaman etkinleştirin
hadoop.classpath.enable
.Kümeyi gönderin, işi Flink kullanıcı arabiriminde görebilmeniz gerekir.
ADLS'de Sonuçları Bul.
Power BI tümleştirmesi
Veriler delta havuzuna girdikten sonra sorguyu Power BI desktop'ta çalıştırabilir ve bir rapor oluşturabilirsiniz.
ADLS 2. Nesil bağlayıcısını kullanarak verileri almak için Power BI desktop'ı açın.
Depolama hesabının URL'si.
Kaynak için M sorgusu oluşturun ve depolama hesabından verileri sorgulayan işlevini çağırın. Delta Power BI bağlayıcılarına bakın.
Veriler kullanıma sunulduktan sonra rapor oluşturabilirsiniz.
Başvurular
- Delta bağlayıcıları.
- Delta Power BI bağlayıcıları.
- Apache, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin