Připojení Apache Flink® ve službě HDInsight ve službě AKS se službou Azure Event Hubs pro Apache Kafka®

Důležité

Tato funkce je aktuálně dostupná jako ukázková verze. Doplňkové podmínky použití pro Microsoft Azure Preview obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nejsou vydány v obecné dostupnosti. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight o službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích v komunitě Azure HDInsight.

Známý případ použití pro Apache Flink je stream analytics. Oblíbená volba mnoha uživatelů k používání datových proudů, které se ingestují pomocí Apache Kafka. Typické instalace Flinku a Kafka začínají tím, že streamy událostí se odsílají do Kafka, které můžou využívat úlohy Flink. Azure Event Hubs poskytuje koncový bod Apache Kafka v centru událostí, který umožňuje uživatelům připojit se k centru událostí pomocí protokolu Kafka.

V tomto článku se podíváme, jak připojit službu Azure Event Hubs k Apache Flinku ve službě HDInsight v AKS a probereme následující témata:

  • Vytvoření oboru názvů služby Event Hubs
  • Vytvoření HDInsightu v clusteru AKS pomocí Apache Flinku
  • Spuštění producenta Flink
  • Package Jar for Apache Flink
  • Odeslání a ověření úlohy

Vytvoření oboru názvů služby Event Hubs a event Hubs

  1. Pokud chcete vytvořit obor názvů služby Event Hubs a službu Event Hubs, podívejte se sem.

    Snímek obrazovky znázorňující nastavení služby Event Hubs

  1. Pomocí existující služby HDInsight ve fondu clusterů AKS můžete vytvořit cluster Flink.

  2. Spusťte producenta Flink, který přidává bootstrap.servers a producer.config informace.

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Nahraďte {YOUR.EVENTHUBS.CONNECTION.STRING} připojovací řetězec pro obor názvů služby Event Hubs. Pokyny k získání připojovací řetězec najdete v podrobnostech o tom, jak získat připojovací řetězec služby Event Hubs.

    Příklad:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. com.example.app balení;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Přidejte fragment kódu pro spuštění Flink Producer.

    Snímek obrazovky znázorňující, jak otestovat Flink ve službě Event Hubs

  3. Po spuštění kódu se události ukládají v tématu "topic1" (téma1 ).

    Snímek obrazovky zobrazující službu Event Hubs uloženou v tématu

Reference

  • Web Apache Flink
  • Názvy apache, Apache Kafka, Kafka, Apache Flink, Flink a přidružených opensourcových projektů jsou ochranné známky Apache Software Foundation (ASF).