Azure Event Hubs

Azure Event Hubs to usługa pozyskiwania danych telemetrycznych w hiperskalie, która zbiera, przekształca i przechowuje miliony zdarzeń. Jest to rozproszona platforma przesyłania strumieniowego, która zapewnia małe opóźnienia i możliwość konfigurowania czasu przechowywania, dzięki czemu można odbierać bardzo duże ilości danych telemetrycznych w chmurze i odczytywać dane z wielu aplikacji przy użyciu semantyki publikowania i subskrybowania.

W tym artykule wyjaśniono, jak używać przesyłania strumieniowego ze strukturą w klastrach usług Azure Event Hubs i Azure Databricks.

Uwaga

Usługa Azure Event Hubs zapewnia punkt końcowy zgodny z platformą Apache Kafka, którego można używać z łącznikiem Platformy Kafka ze strukturą przesyłania strumieniowego, dostępnym w środowisku Databricks Runtime w celu przetwarzania komunikatów z usługi Azure Event Hubs. Usługa Databricks zaleca używanie łącznika Platformy Kafka ze strukturą przesyłania strumieniowego do przetwarzania komunikatów z usługi Azure Event Hubs.

Wymagania

Aby uzyskać informacje o bieżącej obsłudze wersji, zobacz artykuł "Latest Releases" (Najnowsze wydania) w pliku readme projektu Platformy Spark Połączenie or usługi Azure Event Hubs.

  1. Utwórz bibliotekę w obszarze roboczym usługi Azure Databricks przy użyciu współrzędnej com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17narzędzia Maven.

    Uwaga

    Ten łącznik jest regularnie aktualizowany, a najnowsza wersja może być dostępna: zalecamy ściąganie najnowszego łącznika z repozytorium Maven

  2. Zainstaluj utworzoną bibliotekę w klastrze.

Schemat

Schemat rekordów to:

Kolumna Type
body dane binarne
partition string
offset string
sequenceNumber długi
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

Element body jest zawsze udostępniany jako tablica bajtów. Użyj cast("string") polecenia , aby jawnie deserializować kolumnę body .

Szybkie uruchamianie

Zacznijmy od szybkiego przykładu: WordCount. Poniższy notes jest potrzebny do uruchomienia programu WordCount przy użyciu przesyłania strumieniowego ze strukturą w usłudze Azure Event Hubs.

Azure Event Hubs WordCount z notesem przesyłania strumieniowego ze strukturą

Pobierz notes

Konfigurowanie

W tej sekcji omówiono ustawienia konfiguracji potrzebne do pracy z usługą Event Hubs.

Aby uzyskać szczegółowe wskazówki dotyczące konfigurowania przesyłania strumieniowego ze strukturą za pomocą usługi Azure Event Hubs, zobacz Przewodnik dotyczący przesyłania strumieniowego ze strukturą i integracji usługi Azure Event Hubs opracowany przez firmę Microsoft.

Aby uzyskać szczegółowe wskazówki dotyczące korzystania z przesyłania strumieniowego ze strukturą, zobacz Przesyłanie strumieniowe w usłudze Azure Databricks.

Connection string

Usługa Event Hubs parametry połączenia jest wymagana do nawiązania połączenia z usługą Event Hubs. Możesz pobrać parametry połączenia dla wystąpienia usługi Event Hubs z witryny Azure Portal lub przy użyciu elementu ConnectionStringBuilder w bibliotece.

Azure Portal

Po otrzymaniu parametry połączenia z witryny Azure Portal może on mieć klucz lub nie ma goEntityPath. Rozważ następujące kwestie:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Aby nawiązać połączenie z usługą EntityPath EventHubs, musi być obecny. Jeśli parametry połączenia go nie ma, nie martw się. Zajmie się tym:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

Połączenie ionStringBuilder

Alternatywnie możesz użyć elementu ConnectionStringBuilder , aby parametry połączenia.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Wszystkie konfiguracje związane z usługą Event Hubs są wykonywane w usłudze EventHubsConf. Aby utworzyć obiekt EventHubsConf, należy przekazać parametry połączenia:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Aby uzyskać więcej informacji na temat uzyskiwania prawidłowego parametry połączenia, zobacz ciąg Połączenie ion.

Aby uzyskać pełną listę konfiguracji, zobacz EventHubsConf. Oto podzbiór konfiguracji, aby rozpocząć pracę:

Opcja Wartość Wartość domyślna Typ zapytania opis
consumerGroup String "$Default" Przesyłanie strumieniowe i wsadowe Grupa odbiorców to widok całego centrum zdarzeń. Dzięki grupom odbiorców wiele aplikacji odbiorczych może mieć osobny widok strumienia zdarzeń i niezależnie odczytywać strumień we własnym tempie i przy użyciu własnego przesunięcia. Więcej informacji znajduje się w dokumentacji firmy Microsoft.
startingPosition EventPosition Początek strumienia Przesyłanie strumieniowe i wsadowe Pozycja początkowa zadania przesyłania strumieniowego ze strukturą. Zobacz startingPositions , aby uzyskać informacje o kolejności odczytywania opcji.
maxEventsPerTrigger długi partitionCount

* 1000
Zapytanie przesyłane strumieniowo Limit szybkości dla maksymalnej liczby zdarzeń przetwarzanych w interwale wyzwalacza. Określona łączna liczba zdarzeń zostanie proporcjonalnie podzielona między partycje innego woluminu.

Dla każdej opcji istnieje odpowiednie ustawienie w pliku EventHubsConf. Na przykład:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf Umożliwia użytkownikom określanie pozycji początkowych (i końcowych) z klasą EventPosition . EventPosition definiuje położenie zdarzenia w partycji centrum zdarzeń. Pozycja może być czasem w kolejce, przesunięciem, numerem sekwencji, rozpoczęciem strumienia lub końcem strumienia.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Jeśli chcesz uruchomić (lub zakończyć) na określonej pozycji, po prostu utwórz poprawną i EventPosition ustaw ją w elemecie EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Przesyłanie strumieniowe ze strukturą produkcji za pomocą usługi Azure Event Hubs

Podczas uruchamiania zapytań przesyłanych strumieniowo w środowisku produkcyjnym prawdopodobnie potrzebujesz większej niezawodności i czasu pracy niż w przypadku, gdy po prostu dołączysz notes do klastra i interaktywnie uruchomisz zapytania przesyłania strumieniowego. Zaimportuj i uruchom następujący notes, aby dowiedzieć się, jak skonfigurować i uruchomić przesyłanie strumieniowe ze strukturą w środowisku produkcyjnym przy użyciu usług Azure Event Hubs i Azure Databricks.

Aby uzyskać więcej informacji, zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.

Przesyłanie strumieniowe ze strukturą w środowisku produkcyjnym za pomocą notesu usługi Azure Event Hubs

Pobierz notes