Share via


Flink/Delta Verbinding maken or gebruiken

Belangrijk

Deze functie is momenteel beschikbaar in preview. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews bevatten meer juridische voorwaarden die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet beschikbaar zijn in algemene beschikbaarheid. Zie Azure HDInsight op AKS Preview-informatie voor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight met de details en volgt u ons voor meer updates in de Azure HDInsight-community.

Door Apache Flink en Delta Lake samen te gebruiken, kunt u een betrouwbare en schaalbare data lakehouse-architectuur maken. Met de Flink/Delta-Verbinding maken or kunt u gegevens naar Delta-tabellen schrijven met ACID-transacties en precies eenmaal verwerken. Dit betekent dat uw gegevensstromen consistent en foutloos zijn, zelfs als u uw Flink-pijplijn opnieuw start vanaf een controlepunt. De Flink/Delta Verbinding maken or zorgt ervoor dat uw gegevens niet verloren of gedupliceerd zijn en dat deze overeenkomt met de Flink-semantiek.

In dit artikel leert u hoe u de Flink-Delta-connector gebruikt.

  • Lees de gegevens uit de deltatabel.
  • Schrijf de gegevens naar een deltatabel.
  • Voer een query uit in Power BI.

Wat is Flink/Delta connector?

Flink/Delta Verbinding maken or is een JVM-bibliotheek voor het lezen en schrijven van gegevens van Apache Flink-toepassingen naar Delta-tabellen met behulp van de Delta Standalone JVM-bibliotheek. De connector biedt precies eenmaal leveringsgaranties.

Flink/Delta Verbinding maken or omvat:

DeltaSink voor het schrijven van gegevens van Apache Flink naar een Delta-tabel. DeltaSource voor het lezen van Delta-tabellen met Apache Flink.

Apache Flink-Delta Verbinding maken or omvat:

Afhankelijk van de versie van de connector kunt u deze gebruiken met de volgende Apache Flink-versies:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Zie Flink/Delta Verbinding maken or voor meer informatie.

Vereisten

  • HDInsight Flink 1.17.0-cluster op AKS
  • Flink-Delta Verbinding maken or 0.7.0
  • MSI gebruiken voor toegang tot ADLS Gen2
  • IntelliJ voor ontwikkeling

Gegevens uit deltatabel lezen

Delta Source kan in een van de volgende twee modi werken. Dit wordt als volgt beschreven.

  • Gebonden modus Geschikt voor batchtaken, waarbij we alleen inhoud van de Delta-tabel willen lezen voor een specifieke tabelversie. Maak een bron van deze modus met behulp van de DeltaSource.forBoundedRowData-API.

  • Continue modus Geschikt voor streamingtaken, waarbij we continu de Delta-tabel willen controleren op nieuwe wijzigingen en versies. Maak een bron van deze modus met behulp van de DeltaSource.forContinuousRowData-API.

Voorbeeld: Bron maken voor Delta-tabel om alle kolommen in de gebonden modus te lezen. Geschikt voor batchtaken. In dit voorbeeld wordt de meest recente tabelversie geladen.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Zie Gegevensbronmodi voor andere doorlopende modellen.

Schrijven naar Delta-sink

Delta Sink toont momenteel de volgende Flink-metrische gegevens:

Schermopname van de tabel voor flink metrische gegevens.

Sink maken voor niet-gepartitioneerde tabellen

In dit voorbeeld laten we zien hoe we een DeltaSink maken en deze aansluiten op een bestaande org.apache.flink.streaming.api.datastream.DataStream.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Zie Voor een ander voorbeeld van het maken van een sink de metrische gegevens van de gegevenssink.

Volledige code

Lees gegevens uit een deltatabel en sink naar een andere deltatabel.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven-Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Upload het jar-bestand naar ABFS. Schermopname van JAR-bestanden in de app-modus.

  2. Geef de taak-JAR-informatie door in het AppMode-cluster.

    Schermopname van clusterconfiguratie.

    Notitie

    hadoop.classpath.enable Schakel altijd in tijdens het lezen/schrijven naar ADLS.

  3. Dien het cluster in. U moet de taak in flink UI kunnen zien.

    Schermopname van Flink dashboard.

  4. Resultaten zoeken in ADLS.

    Schermopname van de uitvoer.

Power BI-integratie

Zodra de gegevens zich in de delta-sink bevindt, kunt u de query uitvoeren in Power BI Desktop en een rapport maken.

  1. Open Power BI Desktop om de gegevens op te halen met behulp van de ADLS Gen2-connector.

    Schermopname van Power BI Desktop.

    Schermopname van de ADLSGen 2-connector.

  2. URL van het opslagaccount.

    Schermopname van de URL van het opslagaccount.

    Schermopname met ADLS Gen2-details.

  3. Maak M-query voor de bron en roep de functie aan, waarmee de gegevens uit het opslagaccount worden opgevraagd. Raadpleeg Delta Power BI-connectors.

  4. Zodra de gegevens direct beschikbaar zijn, kunt u rapporten maken.

    Schermopname van het maken van rapporten.

Verwijzingen