إثراء الأحداث من Apache Kafka® بسمات من ADLS Gen2 باستخدام Apache Flink®

هام

هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.

في هذه المقالة، يمكنك معرفة كيفية إثراء الأحداث في الوقت الحقيقي عن طريق الانضمام إلى دفق من Kafka مع جدول على ADLS Gen2 باستخدام Flink Streaming. نستخدم Flink Streaming API للانضمام إلى الأحداث من HDInsight Kafka مع سمات من ADLS Gen2. علاوة على ذلك، نستخدم الأحداث المرتبطة بالسمات للغرق في موضوع Kafka آخر.

المتطلبات الأساسية

إعداد موضوع Kafka

نقوم بإنشاء موضوع يسمى user_events.

  • الغرض من ذلك هو قراءة دفق من الأحداث في الوقت الحقيقي من موضوع Kafka باستخدام Flink. لدينا كل حدث مع الحقول التالية:
    user_id,
    item_id, 
    type, 
    timestamp, 
    

Kafka 3.2.0

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092

إعداد ملف على ADLS Gen2

نقوم بإنشاء ملف يسمى item attributes في التخزين لدينا

  • الغرض هو قراءة دفعة من item attributes ملف على ADLS Gen2. يحتوي كل عنصر على الحقول التالية:
    item_id, 
    brand, 
    category, 
    timestamp, 
    

لقطة شاشة تعرض إعداد ملف سمات عنصر دفعي على ADLS Gen2.

في هذه الخطوة، نقوم بتنفيذ الأنشطة التالية

  • user_events إثراء الموضوع من Kafka من خلال الانضمام من item attributes ملف على ADLS Gen2.
  • نحن ندفع نتيجة هذه الخطوة، كنشاك مستخدم غني للأحداث إلى موضوع Kafka.

تطوير مشروع Maven

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkKafkaJoinGen2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

الانضمام إلى موضوع Kafka باستخدام ملف ADLS Gen2

KafkaJoinGen2Demo.java

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;

public class KafkaJoinGen2Demo {
    public static void main(String[] args) throws Exception {
        // 1. Set up the stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka source configuration, update with your broker IPs
        String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
        String inputTopic = "user_events";
        String outputTopic = "user_events_output";
        String groupId = "my_group";

        // 2. Register the cached file, update your container name and storage name
        env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");

        // 3. Read a stream of real-time user behavior event from a Kafka topic
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Parse Kafka source data
      DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
          @Override
          public Tuple4<String, String, String, String> map(String value) throws Exception {
              // Parse the line into a Tuple4
              String[] parts = value.split(",");
              if (parts.length < 4) {
                  // Log and skip malformed record
                  System.out.println("Malformed record: " + value);
                  return null;
              }
              return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
           }
       });

        // 4. Enrich the user activity events by joining the items' attributes from a file
        DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());

        // 5. Output the enriched user activity events to a Kafka topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .build();

        enrichedData.map(value -> value.toString()).sinkTo(sink);

        // 6. Execute the Flink job
        env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
    }

    private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
        private Map<String, Tuple4<String, String, String, String>> itemAttributes;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // Read the cached file and parse its contents into a map
            itemAttributes = new HashMap<>();
            try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    String[] parts = line.split(",");
                    itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
                }
            }
        }

        @Override
        public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
            Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
            if (broadcastValue != null) {
                return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
            } else {
                return null;
            }
        }
    }
}

نحن نقوم بإرسال jar المحزمة إلى Flink:

لقطة شاشة تعرض تعبئة الجرة وإرسالها إلى Flink مع Kafka 3.2.

لقطة شاشة تعرض تعبئة الجرة وإرسالها إلى Flink كخطوة أخرى مع Kafka 3.2.

إنتاج موضوع في الوقت user_events الحقيقي على Kafka

نحن قادرون على إنتاج حدث user_events سلوك المستخدم في الوقت الحقيقي في Kafka.

لقطة شاشة تعرض حدث سلوك المستخدم في الوقت الحقيقي على Kafka 3.2.

استهلاك الانضمام itemAttributes مع user_events على Kafka

نحن نستخدم itemAttributes الآن في أحداث user_eventsنشاط المستخدم للانضمام إلى نظام الملفات .

لقطة شاشة تعرض استخدام أحداث نشاط المستخدم المنضمة إلى سمات العنصر على Kafka 3.2.

نواصل إنتاج واستهلاك نشاط المستخدم وسمات العنصر في الصور التالية

لقطة شاشة توضح كيف نواصل إنتاج حدث سلوك المستخدم في الوقت الحقيقي على Kafka 3.2.

لقطة شاشة توضح كيف نستمر في استهلاك أحداث نشاط المستخدم المنضمة إلى سمات العنصر على Kafka.

المرجع