Azure Event Hubs
Azure Event Hubs to usługa pozyskiwania danych telemetrycznych w hiperskalie, która zbiera, przekształca i przechowuje miliony zdarzeń. Jest to rozproszona platforma przesyłania strumieniowego, która zapewnia małe opóźnienia i możliwość konfigurowania czasu przechowywania, dzięki czemu można odbierać bardzo duże ilości danych telemetrycznych w chmurze i odczytywać dane z wielu aplikacji przy użyciu semantyki publikowania i subskrybowania.
W tym artykule wyjaśniono, jak używać przesyłania strumieniowego ze strukturą w klastrach usług Azure Event Hubs i Azure Databricks.
Uwaga
Usługa Azure Event Hubs zapewnia punkt końcowy zgodny z platformą Apache Kafka, którego można używać z łącznikiem Platformy Kafka ze strukturą przesyłania strumieniowego, dostępnym w środowisku Databricks Runtime w celu przetwarzania komunikatów z usługi Azure Event Hubs. Usługa Databricks zaleca używanie łącznika Platformy Kafka ze strukturą przesyłania strumieniowego do przetwarzania komunikatów z usługi Azure Event Hubs.
Wymagania
Aby uzyskać informacje o bieżącej obsłudze wersji, zobacz artykuł "Latest Releases" (Najnowsze wydania) w pliku readme projektu Platformy Spark Połączenie or usługi Azure Event Hubs.
Utwórz bibliotekę w obszarze roboczym usługi Azure Databricks przy użyciu współrzędnej
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
narzędzia Maven.Uwaga
Ten łącznik jest regularnie aktualizowany, a najnowsza wersja może być dostępna: zalecamy ściąganie najnowszego łącznika z repozytorium Maven
Zainstaluj utworzoną bibliotekę w klastrze.
Schemat
Schemat rekordów to:
Kolumna | Type |
---|---|
body |
dane binarne |
partition |
string |
offset |
string |
sequenceNumber |
długi |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
string |
properties |
map[string,json] |
Element body
jest zawsze udostępniany jako tablica bajtów. Użyj cast("string")
polecenia , aby jawnie deserializować kolumnę body
.
Szybkie uruchamianie
Zacznijmy od szybkiego przykładu: WordCount. Poniższy notes jest potrzebny do uruchomienia programu WordCount przy użyciu przesyłania strumieniowego ze strukturą w usłudze Azure Event Hubs.
Azure Event Hubs WordCount z notesem przesyłania strumieniowego ze strukturą
Konfigurowanie
W tej sekcji omówiono ustawienia konfiguracji potrzebne do pracy z usługą Event Hubs.
Aby uzyskać szczegółowe wskazówki dotyczące konfigurowania przesyłania strumieniowego ze strukturą za pomocą usługi Azure Event Hubs, zobacz Przewodnik dotyczący przesyłania strumieniowego ze strukturą i integracji usługi Azure Event Hubs opracowany przez firmę Microsoft.
Aby uzyskać szczegółowe wskazówki dotyczące korzystania z przesyłania strumieniowego ze strukturą, zobacz Przesyłanie strumieniowe w usłudze Azure Databricks.
Connection string
Usługa Event Hubs parametry połączenia jest wymagana do nawiązania połączenia z usługą Event Hubs. Możesz pobrać parametry połączenia dla wystąpienia usługi Event Hubs z witryny Azure Portal lub przy użyciu elementu ConnectionStringBuilder
w bibliotece.
Azure Portal
Po otrzymaniu parametry połączenia z witryny Azure Portal może on mieć klucz lub nie ma goEntityPath
. Rozważ następujące kwestie:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Aby nawiązać połączenie z usługą EntityPath
EventHubs, musi być obecny. Jeśli parametry połączenia go nie ma, nie martw się.
Zajmie się tym:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
Połączenie ionStringBuilder
Alternatywnie możesz użyć elementu ConnectionStringBuilder
, aby parametry połączenia.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Wszystkie konfiguracje związane z usługą Event Hubs są wykonywane w usłudze EventHubsConf
. Aby utworzyć obiekt EventHubsConf
, należy przekazać parametry połączenia:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Aby uzyskać więcej informacji na temat uzyskiwania prawidłowego parametry połączenia, zobacz ciąg Połączenie ion.
Aby uzyskać pełną listę konfiguracji, zobacz EventHubsConf. Oto podzbiór konfiguracji, aby rozpocząć pracę:
Opcja | Wartość | Wartość domyślna | Typ zapytania | opis |
---|---|---|---|---|
consumerGroup |
String | "$Default" | Przesyłanie strumieniowe i wsadowe | Grupa odbiorców to widok całego centrum zdarzeń. Dzięki grupom odbiorców wiele aplikacji odbiorczych może mieć osobny widok strumienia zdarzeń i niezależnie odczytywać strumień we własnym tempie i przy użyciu własnego przesunięcia. Więcej informacji znajduje się w dokumentacji firmy Microsoft. |
startingPosition |
EventPosition | Początek strumienia | Przesyłanie strumieniowe i wsadowe | Pozycja początkowa zadania przesyłania strumieniowego ze strukturą. Zobacz startingPositions , aby uzyskać informacje o kolejności odczytywania opcji. |
maxEventsPerTrigger |
długi | partitionCount * 1000 |
Zapytanie przesyłane strumieniowo | Limit szybkości dla maksymalnej liczby zdarzeń przetwarzanych w interwale wyzwalacza. Określona łączna liczba zdarzeń zostanie proporcjonalnie podzielona między partycje innego woluminu. |
Dla każdej opcji istnieje odpowiednie ustawienie w pliku EventHubsConf
. Na przykład:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
Umożliwia użytkownikom określanie pozycji początkowych (i końcowych) z klasą EventPosition
. EventPosition
definiuje położenie zdarzenia w partycji centrum zdarzeń. Pozycja może być czasem w kolejce, przesunięciem, numerem sekwencji, rozpoczęciem strumienia lub końcem strumienia.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Jeśli chcesz uruchomić (lub zakończyć) na określonej pozycji, po prostu utwórz poprawną i EventPosition
ustaw ją w elemecie EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
Przesyłanie strumieniowe ze strukturą produkcji za pomocą usługi Azure Event Hubs
Podczas uruchamiania zapytań przesyłanych strumieniowo w środowisku produkcyjnym prawdopodobnie potrzebujesz większej niezawodności i czasu pracy niż w przypadku, gdy po prostu dołączysz notes do klastra i interaktywnie uruchomisz zapytania przesyłania strumieniowego. Zaimportuj i uruchom następujący notes, aby dowiedzieć się, jak skonfigurować i uruchomić przesyłanie strumieniowe ze strukturą w środowisku produkcyjnym przy użyciu usług Azure Event Hubs i Azure Databricks.
Aby uzyskać więcej informacji, zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.