إثراء الأحداث من Apache Kafka® بسمات من ADLS Gen2 باستخدام Apache Flink®
هام
هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.
في هذه المقالة، يمكنك معرفة كيفية إثراء الأحداث في الوقت الحقيقي عن طريق الانضمام إلى دفق من Kafka مع جدول على ADLS Gen2 باستخدام Flink Streaming. نستخدم Flink Streaming API للانضمام إلى الأحداث من HDInsight Kafka مع سمات من ADLS Gen2. علاوة على ذلك، نستخدم الأحداث المرتبطة بالسمات للغرق في موضوع Kafka آخر.
المتطلبات الأساسية
- مجموعة Flink على HDInsight على AKS
- نظام مجموعة Kafka على HDInsight
- تأكد من مراعاة إعدادات الشبكة كما هو موضح في استخدام Kafka على HDInsight للتأكد من أن HDInsight على AKS ومجموعات HDInsight موجودة في نفس الشبكة الظاهرية
- لهذا العرض التوضيحي، نستخدم Window VM كبيئة تطوير مشروع maven في نفس VNet مثل HDInsight على AKS
إعداد موضوع Kafka
نقوم بإنشاء موضوع يسمى user_events
.
- الغرض من ذلك هو قراءة دفق من الأحداث في الوقت الحقيقي من موضوع Kafka باستخدام Flink. لدينا كل حدث مع الحقول التالية:
user_id, item_id, type, timestamp,
Kafka 3.2.0
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092
إعداد ملف على ADLS Gen2
نقوم بإنشاء ملف يسمى item attributes
في التخزين لدينا
- الغرض هو قراءة دفعة من
item attributes
ملف على ADLS Gen2. يحتوي كل عنصر على الحقول التالية:item_id, brand, category, timestamp,
تطوير وظيفة Apache Flink
في هذه الخطوة، نقوم بتنفيذ الأنشطة التالية
user_events
إثراء الموضوع من Kafka من خلال الانضمام منitem attributes
ملف على ADLS Gen2.- نحن ندفع نتيجة هذه الخطوة، كنشاك مستخدم غني للأحداث إلى موضوع Kafka.
تطوير مشروع Maven
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkKafkaJoinGen2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
الانضمام إلى موضوع Kafka باستخدام ملف ADLS Gen2
KafkaJoinGen2Demo.java
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
public class KafkaJoinGen2Demo {
public static void main(String[] args) throws Exception {
// 1. Set up the stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka source configuration, update with your broker IPs
String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
String inputTopic = "user_events";
String outputTopic = "user_events_output";
String groupId = "my_group";
// 2. Register the cached file, update your container name and storage name
env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");
// 3. Read a stream of real-time user behavior event from a Kafka topic
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Parse Kafka source data
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String value) throws Exception {
// Parse the line into a Tuple4
String[] parts = value.split(",");
if (parts.length < 4) {
// Log and skip malformed record
System.out.println("Malformed record: " + value);
return null;
}
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
}
});
// 4. Enrich the user activity events by joining the items' attributes from a file
DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());
// 5. Output the enriched user activity events to a Kafka topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
enrichedData.map(value -> value.toString()).sinkTo(sink);
// 6. Execute the Flink job
env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
}
private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
private Map<String, Tuple4<String, String, String, String>> itemAttributes;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Read the cached file and parse its contents into a map
itemAttributes = new HashMap<>();
try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
}
}
}
@Override
public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
if (broadcastValue != null) {
return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
} else {
return null;
}
}
}
}
حزم jar، وإرسالها إلى Apache Flink
نحن نقوم بإرسال jar المحزمة إلى Flink:
إنتاج موضوع في الوقت user_events
الحقيقي على Kafka
نحن قادرون على إنتاج حدث user_events
سلوك المستخدم في الوقت الحقيقي في Kafka.
استهلاك الانضمام itemAttributes
مع user_events
على Kafka
نحن نستخدم itemAttributes
الآن في أحداث user_events
نشاط المستخدم للانضمام إلى نظام الملفات .
نواصل إنتاج واستهلاك نشاط المستخدم وسمات العنصر في الصور التالية
المرجع
- أمثلة على Flink
- موقع ويب Apache Flink
- Apache وApache Kafka وKafka وApache Flink وFlink وأسماء مشاريع مصدر مفتوح المرتبطة هي علامات تجارية ل Apache Software Foundation (ASF).
الملاحظات
https://aka.ms/ContentUserFeedback.
قريبًا: خلال عام 2024، سنتخلص تدريجيًا من GitHub Issues بوصفها آلية إرسال ملاحظات للمحتوى ونستبدلها بنظام ملاحظات جديد. لمزيد من المعلومات، راجعإرسال الملاحظات وعرضها المتعلقة بـ