Azure Event Hubs

Azure Event Hubs je služba pro příjem telemetrických dat hyper-škálování, která shromažďuje, transformuje a ukládá miliony událostí. Jako platforma pro distribuované streamování nabízí nízkou latenci a konfigurovatelné časové uchovávání, které umožňuje ingestovat příchozí přenosy obrovského množství telemetrických dat do cloudu a načítat tato data z více aplikací na principu publikování a odběru.

Tento článek vysvětluje, jak používat strukturované streamování se službou Azure Event Hubs a clustery Azure Databricks.

Poznámka:

Azure Event Hubs poskytuje koncový bod kompatibilní s Apache Kafka, který můžete použít s konektorem Kafka strukturovaného streamování, který je k dispozici v Databricks Runtime, ke zpracování zpráv ze služby Azure Event Hubs. Databricks doporučuje ke zpracování zpráv ze služby Azure Event Hubs použít konektor Kafka strukturovaného streamování.

Požadavky

Aktuální podporu vydaných verzí najdete v tématu Nejnovější verze v souboru readme projektu služby Azure Event Hubs Spark Připojení or.

  1. Vytvořte knihovnu v pracovním prostoru Azure Databricks pomocí souřadnic com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17Mavenu.

    Poznámka:

    Tento konektor se pravidelně aktualizuje a může být dostupná novější verze: Doporučujeme stáhnout nejnovější konektor z úložiště Maven.

  2. Nainstalujte vytvořenou knihovnu do clusteru.

Schéma

Schémazáznamůch

Column Typ
body binární
partition string
offset string
sequenceNumber long
enqueuedTime časové razítko
publisher string
partitionKey string
properties map[řetězec,json]

Vždy body se poskytuje jako bajtové pole. Slouží cast("string") k explicitní deserializaci body sloupce.

Rychlé spuštění

Začněme rychlým příkladem: WordCount. Následující poznámkový blok je vše, co trvá spuštění Služby WordCount pomocí strukturovaného streamování se službou Azure Event Hubs.

Azure Event Hubs WordCount s poznámkovým blokem strukturovaného streamování

Získat poznámkový blok

Konfigurace

Tato část popisuje nastavení konfigurace, která potřebujete pro práci se službou Event Hubs.

Podrobné pokyny ke konfiguraci strukturovaného streamování se službou Azure Event Hubs najdete v průvodci integrací strukturovaného streamování a služby Azure Event Hubs vyvinutým Microsoftem.

Podrobné pokyny k používání strukturovaného streamování najdete v tématu Streamování v Azure Databricks.

Connection string

Služba Event Hubs připojovací řetězec se vyžaduje pro připojení ke službě Event Hubs. Připojovací řetězec pro instanci služby Event Hubs můžete získat z webu Azure Portal nebo pomocí ConnectionStringBuilder knihovny.

portál Azure

Když z webu Azure Portal získáte připojovací řetězec, může nebo nemusí klíč obsahovatEntityPath. Rozmyslete si:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Pokud se chcete připojit ke službě EventHubs, EntityPath musí být k dispozici. Pokud váš připojovací řetězec ho nemá, nemějte obavy. Postará se o to:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

Připojení ionStringBuilder

Případně můžete použít ConnectionStringBuilder připojovací řetězec.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Veškerá konfigurace související se službou Event Hubs probíhá ve vaší službě EventHubsConf. Chcete-li vytvořit EventHubsConf, musíte předat připojovací řetězec:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Další informace o získání platného připojovací řetězec najdete v tématu Připojení ion String.

Úplný seznam konfigurací najdete v tématu EventHubsConf. Tady je podmnožina konfigurací , které vám pomůžou začít:

Možnost Hodnota Výchozí Typ dotazu Popis
consumerGroup String "$Default" Streamování a dávka Skupina příjemců je zobrazení celého centra událostí. Skupiny příjemců poskytují různým přijímajícím aplikacím oddělená zobrazení datového proudu událostí a umožňují jim nezávisle číst datový proud vlastním tempem a s použitím vlastních posunů. Další informace jsou k dispozici v dokumentaci Microsoftu.
startingPosition EventPosition Začátek streamu Streamování a dávka Výchozí pozice pro úlohu strukturovaného streamování Informace o pořadí čtení možností najdete v části startPozice .
maxEventsPerTrigger long partitionCount

* 1000
Dotaz streamování Omezení rychlosti maximálního počtu událostí zpracovaných v intervalu triggeru Zadaný celkový počet událostí bude úměrně rozdělen mezi oddíly různých svazků.

Pro každou možnost existuje odpovídající nastavení v EventHubsConf. Příklad:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf umožňuje uživatelům určit počáteční (a koncové) pozice s EventPosition třídou. EventPosition definuje pozici události v oddílu centra událostí. Pozice může být zarovnaná doba, posun, pořadové číslo, začátek datového proudu nebo konec datového proudu.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Pokud chcete začít (nebo končit) na určité pozici, jednoduše vytvořte správnou EventPosition hodnotu a nastavte ji v EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Strukturované streamování produkčního prostředí se službou Azure Event Hubs

Při spouštění dotazů streamování v produkčním prostředí pravděpodobně potřebujete větší odolnost a záruky doby provozu, než byste měli, když jednoduše připojíte poznámkový blok ke clusteru a spustíte dotazy streamování interaktivně. Naimportujte a spusťte následující poznámkový blok, který ukazuje, jak nakonfigurovat a spustit strukturované streamování v produkčním prostředí se službou Azure Event Hubs a Azure Databricks.

Další informace najdete v tématu Aspekty produkce strukturovaného streamování.

Produkční strukturované streamování s využitím poznámkového bloku Azure Event Hubs

Získat poznámkový blok