Připojení Apache Flink® ve službě HDInsight ve službě AKS se službou Azure Event Hubs pro Apache Kafka®
Důležité
Tato funkce je aktuálně dostupná jako ukázková verze. Doplňkové podmínky použití pro Microsoft Azure Preview obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nejsou vydány v obecné dostupnosti. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight o službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích v komunitě Azure HDInsight.
Známý případ použití pro Apache Flink je stream analytics. Oblíbená volba mnoha uživatelů k používání datových proudů, které se ingestují pomocí Apache Kafka. Typické instalace Flinku a Kafka začínají tím, že streamy událostí se odsílají do Kafka, které můžou využívat úlohy Flink. Azure Event Hubs poskytuje koncový bod Apache Kafka v centru událostí, který umožňuje uživatelům připojit se k centru událostí pomocí protokolu Kafka.
V tomto článku se podíváme, jak připojit službu Azure Event Hubs k Apache Flinku ve službě HDInsight v AKS a probereme následující témata:
- Vytvoření oboru názvů služby Event Hubs
- Vytvoření HDInsightu v clusteru AKS pomocí Apache Flinku
- Spuštění producenta Flink
- Package Jar for Apache Flink
- Odeslání a ověření úlohy
Vytvoření oboru názvů služby Event Hubs a event Hubs
Pokud chcete vytvořit obor názvů služby Event Hubs a službu Event Hubs, podívejte se sem.
Nastavení clusteru Flink ve službě HDInsight v AKS
Pomocí existující služby HDInsight ve fondu clusterů AKS můžete vytvořit cluster Flink.
Spusťte producenta Flink, který přidává bootstrap.servers a
producer.config
informace.bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Nahraďte
{YOUR.EVENTHUBS.CONNECTION.STRING}
připojovací řetězec pro obor názvů služby Event Hubs. Pokyny k získání připojovací řetězec najdete v podrobnostech o tom, jak získat připojovací řetězec služby Event Hubs.Příklad:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Balení JAR pro Flink
com.example.app balení;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Přidejte fragment kódu pro spuštění Flink Producer.
Po spuštění kódu se události ukládají v tématu "topic1" (téma1 ).
Reference
- Web Apache Flink
- Názvy apache, Apache Kafka, Kafka, Apache Flink, Flink a přidružených opensourcových projektů jsou ochranné známky Apache Software Foundation (ASF).