Azure Event Hubs

Azure Event Hubs ist ein hyperskalierbarer Dienst für die Erfassung von Telemetriedaten, der Millionen von Ereignissen sammelt, transformiert und speichert. Diese verteilte Streamingplattform bietet niedrige Latenz und konfigurierbare Aufbewahrungszeiten, wodurch Sie riesige Mengen an Telemetriedaten in die Cloud einspeisen können. Über die Semantik „Veröffentlichen/Abonnieren“ können Sie die Daten verschiedener Anwendungen lesen.

In diesem Artikel wird erläutert, wie Sie strukturiertes Streaming mit Azure Event Hubs und Azure Databricks-Clustern verwenden.

Hinweis

Azure Event Hubs stellt einen mit Apache Kafka kompatiblen Endpunkt bereit, den Sie mit dem in Databricks Runtime verfügbaren Kafka-Connector für strukturiertes Streaming verwenden können, um Nachrichten von Azure Event Hubs zu verarbeiten. Databricks empfiehlt die Verwendung des Kafka-Connectors für strukturiertes Streaming, um Nachrichten von Azure Event Hubs zu verarbeiten.

Anforderungen

Informationen zur Unterstützung der aktuellen Version finden Sie unter „Neueste Versionen“ in der Infodatei zum Azure Event Hubs-Spark-Connector.

  1. Erstellen Sie eine Bibliothek in Ihrem Azure Databricks-Arbeitsbereich mithilfe der Maven-Koordinate com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    Hinweis

    Dieser Connector wird regelmäßig aktualisiert, und unter Umständen ist eine aktuellere Version verfügbar. Wir empfehlen Ihnen, den aktuellen Connector aus dem Maven-Repository zu pullen.

  2. Installieren Sie die erstellte Bibliothek in Ihrem Cluster.

Schema

Das Schema der Datensätze ist wie folgt:

Column Type
body binary
partition Zeichenfolge
offset Zeichenfolge
sequenceNumber lang
enqueuedTime timestamp
publisher Zeichenfolge
partitionKey Zeichenfolge
properties map[string,json]

body wird immer als Bytearray bereitgestellt. Verwenden Sie cast("string"), um die Spalte body explizit zu deserialisieren.

Schnellstart

Beginnen wir mit einem kurzen Beispiel: WordCount. Sie brauchen nur das folgende Notebook, um WordCount anhand von strukturiertem Streaming mit Azure Event Hubs auszuführen.

Notebook für Azure Event Hubs-WordCount mit strukturiertem Streaming

Notebook abrufen

Konfiguration

In diesem Abschnitt werden die Konfigurationseinstellungen erläutert, die Sie für das Arbeiten mit Event Hubs benötigen.

Ausführliche Anleitungen zum Konfigurieren von strukturiertem Streaming mit Azure Event Hubs finden Sie in dem von Microsoft entwickelten Integrationsleitfaden für strukturiertes Streaming und Azure Event Hubs.

Eine ausführliche Anleitung zur Verwendung von strukturiertem Streaming finden Sie unter Streaming in Azure Databricks.

Verbindungszeichenfolge

Eine Event Hubs-Verbindungszeichenfolge ist erforderlich, um eine Verbindung mit dem Event Hubs-Dienst herzustellen. Sie können die Verbindungszeichenfolge für Ihre Event Hubs-Instanz über das Azure-Portal oder mithilfe von ConnectionStringBuilder in der Bibliothek abrufen.

Azure-Portal

Wenn Sie die Verbindungszeichenfolge über das Azure-Portal abrufen, kann sie den EntityPath-Schlüssel enthalten oder auch nicht. Berücksichtigen Sie dabei Folgendes:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Um eine Verbindung mit Ihrer Event Hubs-Instanz herzustellen, muss ein EntityPath vorhanden sein. Wenn dieser nicht in Ihrer Verbindungszeichenfolge enthalten ist, machen Sie sich keine Sorgen. Dieses Problem wird hiermit behoben:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Alternativ können Sie ConnectionStringBuilder verwenden, um die Verbindungszeichenfolge zu erstellen.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Alle Konfigurationen im Zusammenhang mit Event Hubs erfolgen in EventHubsConf. Zum Erstellen von EventHubsConf müssen Sie eine Verbindungszeichenfolge übergeben:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Weitere Informationen zum Abrufen einer gültigen Verbindungszeichenfolge finden Sie unter Verbindungszeichenfolge.

Eine vollständige Liste der Konfigurationen finden Sie unter EventHubsConf. Nachfolgend ist ein Teil der Konfigurationen für den Einstieg angegeben:

Option Wert Standard Abfragetyp BESCHREIBUNG
consumerGroup String „$Default“ Streaming und Batch Eine Consumergruppe ist eine Ansicht eines vollständigen Event Hubs. Mithilfe von Consumergruppen können mehrere verarbeitende Anwendungen jeweils eine separate Ansicht des Ereignisstreams aufweisen und den Stream unabhängig voneinander in einem unabhängigen Tempo und mit eigenen Offsets lesen. Weitere Informationen finden Sie in der Microsoft-Dokumentation.
startingPosition EventPosition Anfang des Streams Streaming und Batch Die Anfangsposition für Ihren Auftrag für strukturiertes Streaming. Informationen zur Reihenfolge, in der Optionen gelesen werden, finden Sie unter startingPositions.
maxEventsPerTrigger lang partitionCount

* 1000
Streamingabfrage Ratenbegrenzung für die maximale Anzahl von Ereignissen, die pro Triggerintervall verarbeitet werden. Die angegebene Gesamtanzahl von Ereignissen wird proportional auf Partitionen mit unterschiedlichen Volumes aufgeteilt.

Für jede Option gibt es eine entsprechende Einstellung in EventHubsConf. Beispiele:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf ermöglicht Benutzern das Angeben der Anfangsposition (und Endposition) mit der EventPosition-Klasse. EventPosition definiert die Position eines Ereignisses in einer Event Hub-Partition. Die Position kann ein Zeitpunkt der Einreihung in die Warteschlange, ein Offset, eine Sequenznummer, der Anfang des Streams oder das Ende des Streams sein.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Wenn Sie an einer bestimmten Position beginnen (oder enden) möchten, erstellen Sie einfach die richtige EventPosition, und legen Sie sie in EventHubsConf fest:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Strukturiertes Streaming mit Azure Event Hubs in der Produktion

Wenn Sie Streamingabfragen in der Produktion ausführen, wünschen Sie wahrscheinlich höhere Stabilitäts- und Verfügbarkeitsgarantien als beim einfachen Anfügen eines Notebooks an einen Cluster und dem interaktiven Ausführen Ihrer Streamingabfragen. Importieren Sie das folgende Notebook, und führen Sie es aus, um anhand einer Demo zu erfahren, wie Sie strukturiertes Streaming in der Produktion mit Azure Event Hubs und Azure Databricks konfigurieren und ausführen.

Weitere Informationen finden Sie unter Produktionsüberlegungen für strukturiertes Streaming.

Notebook für strukturiertes Streaming mit Azure Event Hubs in der Produktion

Notebook abrufen