كيفية استخدام Flink/Delta الاتصال or

هام

هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.

باستخدام Apache Flink وData Lake معا، يمكنك إنشاء بنية مستودع بيانات موثوقة وقابلة للتطوير. يسمح لك Flink/Delta الاتصال or بكتابة البيانات إلى جداول Delta باستخدام معاملات ACID والمعالجة مرة واحدة بالضبط. وهذا يعني أن تدفقات البيانات الخاصة بك متسقة وخالية من الأخطاء، حتى إذا قمت بإعادة تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية Flink من نقطة تحقق. يضمن Flink/Delta الاتصال or عدم فقدان بياناتك أو تكرارها، وتطابقها مع دلالات Flink.

في هذه المقالة، ستتعلم كيفية استخدام موصل Flink-Delta.

  • اقرأ البيانات من جدول دلتا.
  • اكتب البيانات إلى جدول دلتا.
  • استعلم عنه في Power BI.

ما هو موصل Flink/Delta

Flink/Delta الاتصال or هي مكتبة JVM لقراءة البيانات وكتابتها من تطبيقات Apache Flink إلى جداول Delta باستخدام مكتبة Delta Standalone JVM. يوفر الموصل ضمانات تسليم مرة واحدة بالضبط.

يتضمن Flink/Delta الاتصال or ما يلي:

DeltaSink لكتابة البيانات من Apache Flink إلى جدول Delta. DeltaSource لقراءة جداول Delta باستخدام Apache Flink.

يتضمن Apache Flink-Delta الاتصال or ما يلي:

اعتمادا على إصدار الموصل يمكنك استخدامه مع إصدارات Apache Flink التالية:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

لمزيد من المعلومات، راجع Flink/Delta الاتصال or.

المتطلبات الأساسية

  • نظام مجموعة HDInsight Flink 1.17.0 على AKS
  • Flink-Delta الاتصال or 0.7.0
  • استخدام MSI للوصول إلى ADLS Gen2
  • IntelliJ للتطوير

قراءة البيانات من جدول دلتا

يمكن أن يعمل مصدر دلتا في أحد الوضعين، الموضحين على النحو التالي.

  • الوضع المحدد مناسب للوظائف الدفعية، حيث نريد قراءة محتوى جدول Delta لإصدار جدول معين فقط. إنشاء مصدر لهذا الوضع باستخدام DeltaSource.forBoundedRowData API.

  • الوضع المستمر مناسب لمهام الدفق، حيث نريد التحقق باستمرار من جدول Delta بحثا عن التغييرات والإصدارات الجديدة. إنشاء مصدر لهذا الوضع باستخدام DeltaSource.forContinuousRowData API.

مثال: إنشاء مصدر لجدول Delta، لقراءة جميع الأعمدة في الوضع المحدد. مناسب لوظائف الدفعات. يقوم هذا المثال بتحميل أحدث إصدار من الجدول.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

للحصول على مثال نموذج مستمر آخر، راجع أوضاع مصدر البيانات.

الكتابة إلى متلقي Delta

يعرض Delta Sink حاليا مقاييس Flink التالية:

لقطة شاشة تعرض جدول مقاييس Flink.

إنشاء المتلقي للجداول غير المقطوعة

في هذا المثال، نعرض كيفية إنشاء DeltaSink وتوصيلته ب .org.apache.flink.streaming.api.datastream.DataStream

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

للحصول على مثال آخر لإنشاء المتلقي، راجع مقاييس مصدر البيانات.

تعليمات برمجية كاملة

قراءة البيانات من جدول دلتا والمتلقي إلى جدول دلتا آخر.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. تحميل الجرة إلى ABFS. لقطة شاشة تعرض ملفات jar في وضع التطبيق.

  2. تمرير معلومات jar للوظيفة في مجموعة AppMode.

    لقطة شاشة تعرض تكوين نظام المجموعة.

    إشعار

    تمكين hadoop.classpath.enable دائما أثناء القراءة/الكتابة إلى ADLS.

  3. إرسال نظام المجموعة، يجب أن تكون قادرا على رؤية المهمة في واجهة مستخدم Flink.

    لقطة شاشة تعرض لوحة معلومات Flink.

  4. البحث عن النتائج في ADLS.

    لقطة شاشة تعرض الإخراج.

تكامل Power BI

بمجرد أن تكون البيانات في متلقي دلتا، يمكنك تشغيل الاستعلام في Power BI لسطح المكتب وإنشاء تقرير.

  1. افتح Power BI desktop للحصول على البيانات باستخدام موصل ADLS Gen2.

    تظهر لقطة الشاشة Power BI لسطح المكتب.

    تظهر لقطة الشاشة موصل ADLSGen 2.

  2. عنوان URL لحساب التخزين.

    لقطة شاشة تعرض عنوان URL لحساب التخزين.

    تظهر لقطة الشاشة تفاصيل ADLS Gen2.

  3. إنشاء استعلام M للمصدر واستدعاء الدالة التي تستعلم عن البيانات من حساب التخزين. راجع موصلات Delta Power BI.

  4. بمجرد توفر البيانات بسهولة، يمكنك إنشاء تقارير.

    لقطة شاشة توضح كيفية إنشاء التقارير.

المراجع