Öğretici: Stream Apache Kafka kullanarak Event Hubs olaylarını işleme

Bu makalede, veri akışlarını veri Event Hubs veri akışı ve veri akışı Azure Stream Analytics. Bu, aşağıdaki adımlarda size yol boyunca yol veserdir:

  1. Bir Event Hubs ad alanı oluşturun.
  2. Olay hub' a ileti gönderen bir Kafka istemcisi oluşturun.
  3. Olay Stream Analytics Azure blob depolama alanına veri kopyalanan bir iş oluşturun.

Bir olay hub'ı tarafından ortaya çıkacak Kafka uç noktasını kullanırken protokol istemcilerinizi değiştirmenize veya kendi kümelerinizi çalıştırmanıza gerek yok. Azure Event Hubs Apache Kafka sürüm 1.0’ı destekler. ve üzeri.

Önkoşullar

Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşulların karşılandığından emin olun:

Event Hubs ad alanı oluşturma

Bir ad alanı Event Hubs, ad alanı için Kafka uç noktası otomatik olarak etkinleştirilir. Kafka protokolünü kullanan uygulamalarınızı olay hub'larına akışla sebilirsiniz. Azure portal kullanarak olay hub'ı oluşturma adım adım yönergeleri izleyerek Event Hubs oluşturun. Ayrılmış bir küme kullanıyorsanız bkz. Ayrılmış kümede ad alanı ve olay hub'ı oluşturma.

Not

kafka Event Hubs temel katmanda desteklenmiyor.

Kafka ile Event Hubs

  1. Kafka Azure Event Hubs deposunu makinenize klonlama.

  2. Klasörüne gidin: azure-event-hubs-for-kafka/quickstart/java/producer .

  3. içinde üreticinin yapılandırma ayrıntılarını src/main/resources/producer.config güncelleştirin. Olay hub'ı ad alanı için ad ve bağlantı dizesini belirtin.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. sayfasına azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/ gidin ve TestDataReporter.java dosyasını istediğiniz düzenleyicide açın.

  5. Aşağıdaki kod satırına açıklama satırı yapın:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. Açıklama satırı yapılan kodun yerine aşağıdaki kod satırı ekleyin:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    Bu kod olay verilerini JSON biçiminde gönderir. Bir iş için girişi Stream Analytics, JSON'u giriş verileri için biçim olarak belirtirsiniz.

  7. Üreticiyi çalıştırın ve akışı Event Hubs. Bir Windows makinede, bir Node.js istemi kullanırken, bu komutları azure-event-hubs-for-kafka/quickstart/java/producer çalıştırmadan önce klasörüne geçiş yapabilirsiniz.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

Olay hub' ın verileri aldığını doğrulama

  1. VARLıKLAR altında Event Hubs'yi seçin. test adlı bir olay hub'ı gördüğünüzü onaylayın.

    Olay hub'ı - test

  2. Olay hub' a gelen iletileri gördüğünüzü onaylayın.

    Olay hub'ı - iletiler

Bir iş kullanarak olay Stream Analytics işleme

Bu bölümde, bir iş Azure Stream Analytics oluşturabilirsiniz. Kafka istemcisi olayları olay hub' a gönderir. Olay verilerini Stream Analytics alan ve bir Azure blob depolama alanına çıkış olarak alan bir iş oluşturun. Azure Depolama hesabınız yoksa oluşturun.

Veri iş Stream Analytics herhangi bir analiz gerçekleştirmeden veriler üzerinden geçer. Çıkış verilerini farklı bir biçimde veya eldeki içgörülerle üretmek için giriş verilerini dönüştüren bir sorgu oluşturabilirsiniz.

Akış Analizi işi oluşturma

  1. Uygulamanın içinde + Kaynak oluştur'Azure portal.
  2. Azure Market menüsünde Analiz'i seçin ve Stream Analytics seçin.
  3. Yeni Stream Analytics sayfasında aşağıdaki eylemleri gerçekleştirin:
    1. İş için bir ad girin.

    2. Aboneliğinizi seçin.

    3. Kaynak grubu için Yeni oluştur'ı seçin ve adı girin. Mevcut bir kaynak grubunu da kullanabilirsiniz.

    4. İş için bir konum seçin.

    5. İş oluşturmak için Oluştur'a seçin.

      Yeni Stream Analytics işi

İş girişi yapılandırma

  1. Bildirim iletisinde, kaynak iş sayfasını görmek için Kaynağa Stream Analytics seçin.

  2. Sol menüde İş TOPOLOJIsi bölümünde Girişler'i seçin.

  3. Akış girişi ekle'yi ve ardından Olay Hub'ı'ı seçin.

    Giriş olarak olay hub'ı ekleme

  4. Olay Hub'ı giriş yapılandırması sayfasında aşağıdaki eylemleri gerçekleştirin:

    1. Giriş için bir diğer ad belirtin.

    2. Azure aboneliğinizi seçin.

    3. Daha önce oluşturduğunuz olay hub'ı ad alanını seçin.

    4. Olay hub'ı için test'i seçin.

    5. Kaydet’i seçin.

      Olay hub'ı giriş yapılandırması

İş çıkışını yapılandırma

  1. Menüde İş TOPOLOJIsi bölümünde Çıkışlar'ı seçin.
  2. Araç çubuğunda + Ekle'yi ve ardından Blob depolama'yi seçin
  3. Blob depolama çıkış ayarları sayfasında aşağıdaki eylemleri gerçekleştirin:
    1. Çıkış için bir diğer ad belirtin.

    2. Azure aboneliğinizi seçin.

    3. Azure Depolama seçin.

    4. Sorgunun çıkış verilerini depolar kapsayıcı için bir Stream Analytics girin.

    5. Kaydet’i seçin.

      Blob Depolama çıkış yapılandırması

Sorgu tanımlama

Gelen bir veri akışını okumak için bir Stream Analytics işi ayarladıktan sonraki adım, verileri gerçek zamanlı olarak analiz eden bir dönüştürme oluşturmaktır. Dönüştürme sorgusunu Stream Analytics sorgu dilini kullanarak tanımlarsınız. Bu kılavuzda, herhangi bir dönüştürme gerçekleştirmeden veriden geçen bir sorgu tanımlarsiniz.

  1. Sorgu'ya seçin.

  2. Sorgu penceresinde yerine daha [YourOutputAlias] önce oluşturduğunuz çıkış diğer adını girin.

  3. yerine [YourInputAlias] daha önce oluşturduğunuz giriş diğer adını girin.

  4. Araç çubuğunda Kaydet’i seçin.

    Ekran yakalama, giriş ve çıkış değişkenleri değerlerinin yer alan sorgu penceresini gösterir.

Stream Analytics işini çalıştırma

  1. Sol menüden Genel Bakış'ı seçin.

  2. Başlat'ı seçin.

    Başlat menüsü

  3. İş başlat sayfasında Başlat'ı seçin.

    İş başlatma sayfası

  4. İş durumunun Başlatan'dan çalıştırmaya kadar bekleyin.

    İş durumu - çalışıyor

Senaryoyu test etmek

  1. Olayları olay hub'a göndermek için Kafka üreticisini yeniden çalıştırın.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. Çıktı verilerini Azure blob depolama alanında gördüğünüzden onaylayın. Kapsayıcıda aşağıdaki örnek satırlara benzer 100 satır içeren bir JSON dosyası görüyorsunuz:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    Bu Azure Stream Analytics, olay hub'larından giriş verilerini aldı ve bu senaryoda Azure blob depolama alanında depoladı.

Sonraki adımlar

Bu makalede, protokol istemcilerinizi değiştirmeden veya kendi kümelerinizi Event Hubs ağlara akışla akışla nasıl akışla alanın öğrenildi. Daha fazla bilgi edinmek Event Hubs için Apache Kafka için Apache Kafka geliştirici kılavuzuna Azure Event Hubs.