Öğretici: Stream Apache Kafka kullanarak Event Hubs olaylarını işleme
Bu makalede, veri akışlarını veri Event Hubs veri akışı ve veri akışı Azure Stream Analytics. Bu, aşağıdaki adımlarda size yol boyunca yol veserdir:
- Bir Event Hubs ad alanı oluşturun.
- Olay hub' a ileti gönderen bir Kafka istemcisi oluşturun.
- Olay Stream Analytics Azure blob depolama alanına veri kopyalanan bir iş oluşturun.
Bir olay hub'ı tarafından ortaya çıkacak Kafka uç noktasını kullanırken protokol istemcilerinizi değiştirmenize veya kendi kümelerinizi çalıştırmanıza gerek yok. Azure Event Hubs Apache Kafka sürüm 1.0’ı destekler. ve üzeri.
Önkoşullar
Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşulların karşılandığından emin olun:
- Azure aboneliği. Aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
- Java Development Kit (JDK) 1.7+.
- Bir Maven ikili arşivini indirin ve yükleyin.
- Git
- Bir Azure Depolama hesabı. Yoksa devam etmeden önce bir tane oluşturun. Bu Stream Analytics iş, çıktı verilerini bir Azure blob depolama alanında depolar.
Event Hubs ad alanı oluşturma
Bir ad alanı Event Hubs, ad alanı için Kafka uç noktası otomatik olarak etkinleştirilir. Kafka protokolünü kullanan uygulamalarınızı olay hub'larına akışla sebilirsiniz. Azure portal kullanarak olay hub'ı oluşturma adım adım yönergeleri izleyerek Event Hubs oluşturun. Ayrılmış bir küme kullanıyorsanız bkz. Ayrılmış kümede ad alanı ve olay hub'ı oluşturma.
Not
kafka Event Hubs temel katmanda desteklenmiyor.
Kafka ile Event Hubs
Kafka Azure Event Hubs deposunu makinenize klonlama.
Klasörüne gidin:
azure-event-hubs-for-kafka/quickstart/java/producer.içinde üreticinin yapılandırma ayrıntılarını
src/main/resources/producer.configgüncelleştirin. Olay hub'ı ad alanı için ad ve bağlantı dizesini belirtin.bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";sayfasına
azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/gidin ve TestDataReporter.java dosyasını istediğiniz düzenleyicide açın.Aşağıdaki kod satırına açıklama satırı yapın:
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);Açıklama satırı yapılan kodun yerine aşağıdaki kod satırı ekleyin:
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");Bu kod olay verilerini JSON biçiminde gönderir. Bir iş için girişi Stream Analytics, JSON'u giriş verileri için biçim olarak belirtirsiniz.
Üreticiyi çalıştırın ve akışı Event Hubs. Bir Windows makinede, bir Node.js istemi kullanırken, bu komutları
azure-event-hubs-for-kafka/quickstart/java/producerçalıştırmadan önce klasörüne geçiş yapabilirsiniz.mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
Olay hub' ın verileri aldığını doğrulama
VARLıKLAR altında Event Hubs'yi seçin. test adlı bir olay hub'ı gördüğünüzü onaylayın.

Olay hub' a gelen iletileri gördüğünüzü onaylayın.

Bir iş kullanarak olay Stream Analytics işleme
Bu bölümde, bir iş Azure Stream Analytics oluşturabilirsiniz. Kafka istemcisi olayları olay hub' a gönderir. Olay verilerini Stream Analytics alan ve bir Azure blob depolama alanına çıkış olarak alan bir iş oluşturun. Azure Depolama hesabınız yoksa oluşturun.
Veri iş Stream Analytics herhangi bir analiz gerçekleştirmeden veriler üzerinden geçer. Çıkış verilerini farklı bir biçimde veya eldeki içgörülerle üretmek için giriş verilerini dönüştüren bir sorgu oluşturabilirsiniz.
Akış Analizi işi oluşturma
- Uygulamanın içinde + Kaynak oluştur'Azure portal.
- Azure Market menüsünde Analiz'i seçin ve Stream Analytics seçin.
- Yeni Stream Analytics sayfasında aşağıdaki eylemleri gerçekleştirin:
İş için bir ad girin.
Aboneliğinizi seçin.
Kaynak grubu için Yeni oluştur'ı seçin ve adı girin. Mevcut bir kaynak grubunu da kullanabilirsiniz.
İş için bir konum seçin.
İş oluşturmak için Oluştur'a seçin.

İş girişi yapılandırma
Bildirim iletisinde, kaynak iş sayfasını görmek için Kaynağa Stream Analytics seçin.
Sol menüde İş TOPOLOJIsi bölümünde Girişler'i seçin.
Akış girişi ekle'yi ve ardından Olay Hub'ı'ı seçin.

Olay Hub'ı giriş yapılandırması sayfasında aşağıdaki eylemleri gerçekleştirin:
Giriş için bir diğer ad belirtin.
Azure aboneliğinizi seçin.
Daha önce oluşturduğunuz olay hub'ı ad alanını seçin.
Olay hub'ı için test'i seçin.
Kaydet’i seçin.

İş çıkışını yapılandırma
- Menüde İş TOPOLOJIsi bölümünde Çıkışlar'ı seçin.
- Araç çubuğunda + Ekle'yi ve ardından Blob depolama'yi seçin
- Blob depolama çıkış ayarları sayfasında aşağıdaki eylemleri gerçekleştirin:
Çıkış için bir diğer ad belirtin.
Azure aboneliğinizi seçin.
Azure Depolama seçin.
Sorgunun çıkış verilerini depolar kapsayıcı için bir Stream Analytics girin.
Kaydet’i seçin.

Sorgu tanımlama
Gelen bir veri akışını okumak için bir Stream Analytics işi ayarladıktan sonraki adım, verileri gerçek zamanlı olarak analiz eden bir dönüştürme oluşturmaktır. Dönüştürme sorgusunu Stream Analytics sorgu dilini kullanarak tanımlarsınız. Bu kılavuzda, herhangi bir dönüştürme gerçekleştirmeden veriden geçen bir sorgu tanımlarsiniz.
Sorgu'ya seçin.
Sorgu penceresinde yerine daha
[YourOutputAlias]önce oluşturduğunuz çıkış diğer adını girin.yerine
[YourInputAlias]daha önce oluşturduğunuz giriş diğer adını girin.Araç çubuğunda Kaydet’i seçin.

Stream Analytics işini çalıştırma
Sol menüden Genel Bakış'ı seçin.
Başlat'ı seçin.

İş başlat sayfasında Başlat'ı seçin.

İş durumunun Başlatan'dan çalıştırmaya kadar bekleyin.

Senaryoyu test etmek
Olayları olay hub'a göndermek için Kafka üreticisini yeniden çalıştırın.
mvn exec:java -Dexec.mainClass="TestProducer"Çıktı verilerini Azure blob depolama alanında gördüğünüzden onaylayın. Kapsayıcıda aşağıdaki örnek satırlara benzer 100 satır içeren bir JSON dosyası görüyorsunuz:
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}Bu Azure Stream Analytics, olay hub'larından giriş verilerini aldı ve bu senaryoda Azure blob depolama alanında depoladı.
Sonraki adımlar
Bu makalede, protokol istemcilerinizi değiştirmeden veya kendi kümelerinizi Event Hubs ağlara akışla akışla nasıl akışla alanın öğrenildi. Daha fazla bilgi edinmek Event Hubs için Apache Kafka için Apache Kafka geliştirici kılavuzuna Azure Event Hubs.