Azure Event Hubs
Azure Event Hubs je služba pro příjem telemetrických dat hyper-škálování, která shromažďuje, transformuje a ukládá miliony událostí. Jako platforma pro distribuované streamování nabízí nízkou latenci a konfigurovatelné časové uchovávání, které umožňuje ingestovat příchozí přenosy obrovského množství telemetrických dat do cloudu a načítat tato data z více aplikací na principu publikování a odběru.
Tento článek vysvětluje, jak používat strukturované streamování se službou Azure Event Hubs a clustery Azure Databricks.
Poznámka:
Azure Event Hubs poskytuje koncový bod kompatibilní s Apache Kafka, který můžete použít s konektorem Kafka strukturovaného streamování, který je k dispozici v Databricks Runtime, ke zpracování zpráv ze služby Azure Event Hubs. Databricks doporučuje ke zpracování zpráv ze služby Azure Event Hubs použít konektor Kafka strukturovaného streamování.
Požadavky
Aktuální podporu vydaných verzí najdete v tématu Nejnovější verze v souboru readme projektu služby Azure Event Hubs Spark Připojení or.
Vytvořte knihovnu v pracovním prostoru Azure Databricks pomocí souřadnic
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
Mavenu.Poznámka:
Tento konektor se pravidelně aktualizuje a může být dostupná novější verze: Doporučujeme stáhnout nejnovější konektor z úložiště Maven.
Nainstalujte vytvořenou knihovnu do clusteru.
Schéma
Schémazáznamůch
Column | Typ |
---|---|
body |
binární |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
časové razítko |
publisher |
string |
partitionKey |
string |
properties |
map[řetězec,json] |
Vždy body
se poskytuje jako bajtové pole. Slouží cast("string")
k explicitní deserializaci body
sloupce.
Rychlé spuštění
Začněme rychlým příkladem: WordCount. Následující poznámkový blok je vše, co trvá spuštění Služby WordCount pomocí strukturovaného streamování se službou Azure Event Hubs.
Azure Event Hubs WordCount s poznámkovým blokem strukturovaného streamování
Konfigurace
Tato část popisuje nastavení konfigurace, která potřebujete pro práci se službou Event Hubs.
Podrobné pokyny ke konfiguraci strukturovaného streamování se službou Azure Event Hubs najdete v průvodci integrací strukturovaného streamování a služby Azure Event Hubs vyvinutým Microsoftem.
Podrobné pokyny k používání strukturovaného streamování najdete v tématu Streamování v Azure Databricks.
Connection string
Služba Event Hubs připojovací řetězec se vyžaduje pro připojení ke službě Event Hubs. Připojovací řetězec pro instanci služby Event Hubs můžete získat z webu Azure Portal nebo pomocí ConnectionStringBuilder
knihovny.
portál Azure
Když z webu Azure Portal získáte připojovací řetězec, může nebo nemusí klíč obsahovatEntityPath
. Rozmyslete si:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Pokud se chcete připojit ke službě EventHubs, EntityPath
musí být k dispozici. Pokud váš připojovací řetězec ho nemá, nemějte obavy.
Postará se o to:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
Připojení ionStringBuilder
Případně můžete použít ConnectionStringBuilder
připojovací řetězec.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Veškerá konfigurace související se službou Event Hubs probíhá ve vaší službě EventHubsConf
. Chcete-li vytvořit EventHubsConf
, musíte předat připojovací řetězec:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Další informace o získání platného připojovací řetězec najdete v tématu Připojení ion String.
Úplný seznam konfigurací najdete v tématu EventHubsConf. Tady je podmnožina konfigurací , které vám pomůžou začít:
Možnost | Hodnota | Výchozí | Typ dotazu | Popis |
---|---|---|---|---|
consumerGroup |
String | "$Default" | Streamování a dávka | Skupina příjemců je zobrazení celého centra událostí. Skupiny příjemců poskytují různým přijímajícím aplikacím oddělená zobrazení datového proudu událostí a umožňují jim nezávisle číst datový proud vlastním tempem a s použitím vlastních posunů. Další informace jsou k dispozici v dokumentaci Microsoftu. |
startingPosition |
EventPosition | Začátek streamu | Streamování a dávka | Výchozí pozice pro úlohu strukturovaného streamování Informace o pořadí čtení možností najdete v části startPozice . |
maxEventsPerTrigger |
long | partitionCount * 1000 |
Dotaz streamování | Omezení rychlosti maximálního počtu událostí zpracovaných v intervalu triggeru Zadaný celkový počet událostí bude úměrně rozdělen mezi oddíly různých svazků. |
Pro každou možnost existuje odpovídající nastavení v EventHubsConf
. Příklad:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
umožňuje uživatelům určit počáteční (a koncové) pozice s EventPosition
třídou. EventPosition
definuje pozici události v oddílu centra událostí. Pozice může být zarovnaná doba, posun, pořadové číslo, začátek datového proudu nebo konec datového proudu.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Pokud chcete začít (nebo končit) na určité pozici, jednoduše vytvořte správnou EventPosition
hodnotu a nastavte ji v EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
Strukturované streamování produkčního prostředí se službou Azure Event Hubs
Při spouštění dotazů streamování v produkčním prostředí pravděpodobně potřebujete větší odolnost a záruky doby provozu, než byste měli, když jednoduše připojíte poznámkový blok ke clusteru a spustíte dotazy streamování interaktivně. Naimportujte a spusťte následující poznámkový blok, který ukazuje, jak nakonfigurovat a spustit strukturované streamování v produkčním prostředí se službou Azure Event Hubs a Azure Databricks.
Další informace najdete v tématu Aspekty produkce strukturovaného streamování.