Azure Event Hubs

Azure Event Hubs is een telemetrieopnameservice op hyperschaal die miljoenen gebeurtenissen verzamelt, transformeert en opslaat. Omdat het een gedistribueerd streamingplatform is, biedt het u een lage latentie en configureerbare retentietijd, waardoor u enorme hoeveelheden telemetriegegevens in de cloud kunt opnemen en u de gegevens kunt lezen vanuit diverse toepassingen met behulp van publicatie-abonnementsemantiek.

In dit artikel wordt uitgelegd hoe u Structured Streaming gebruikt met Azure Event Hubs- en Azure Databricks-clusters.

Notitie

Azure Event Hubs biedt een eindpunt dat compatibel is met Apache Kafka die u kunt gebruiken met de Structured Streaming Kafka-connector, beschikbaar in Databricks Runtime, om berichten van Azure Event Hubs te verwerken. Databricks raadt aan om de Structured Streaming Kafka-connector te gebruiken om berichten van Azure Event Hubs te verwerken.

Vereisten

Zie 'Nieuwste releases' in het Leesmij-bestand van het Azure Event Hubs Spark-Verbinding maken or-project voor de huidige release.

  1. Maak een bibliotheek in uw Azure Databricks-werkruimte met behulp van de Maven-coördinaat com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    Notitie

    Deze connector wordt regelmatig bijgewerkt en er is mogelijk een recentere versie beschikbaar: we raden u aan de meest recente connector op te halen uit de Maven-opslagplaats

  2. Installeer de gemaakte bibliotheek in uw cluster.

Schema

Het schema van de records is:

Column Type
body binair
partition tekenreeks
offset tekenreeks
sequenceNumber long
enqueuedTime timestamp
publisher tekenreeks
partitionKey tekenreeks
properties map[string,json]

De body waarde wordt altijd geleverd als een bytematrix. Gebruik cast("string") dit om de kolom expliciet deserialiseren body .

Snel starten

Laten we beginnen met een snel voorbeeld: WordCount. Het volgende notebook is alles wat nodig is om WordCount uit te voeren met structured streaming met Azure Event Hubs.

Azure Event Hubs WordCount met gestructureerd streaming-notebook

Notebook downloaden

Configuratie

In deze sectie worden de configuratie-instellingen besproken die u nodig hebt om te werken met Event Hubs.

Zie de integratiehandleiding voor gestructureerde streaming en Azure Event Hubs die door Microsoft zijn ontwikkeld voor gedetailleerde richtlijnen voor het configureren van Structured Streaming met Azure Event Hubs.

Zie Streaming op Azure Databricks voor gedetailleerde richtlijnen over het gebruik van Structured Streaming.

Connection string

Een Event Hubs-verbindingsreeks is vereist om verbinding te maken met de Event Hubs-service. U kunt de verbindingsreeks voor uw Event Hubs-exemplaar ophalen vanuit Azure Portal of met behulp van de ConnectionStringBuilder bibliotheek.

Azure Portal

Wanneer u de verbindingsreeks uit Azure Portal krijgt, heeft deze mogelijk of niet de EntityPath sleutel. Overweeg het volgende:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Als u verbinding wilt maken met uw EventHubs, moet er een EntityPath aanwezig zijn. Als uw verbindingsreeks geen verbindingsreeks heeft, maakt u zich geen zorgen. Dit zorgt ervoor:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

Verbinding maken ionStringBuilder

U kunt ook de ConnectionStringBuilder verbindingsreeks maken.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Alle configuraties met betrekking tot Event Hubs vindt plaats in uw EventHubsConf. Als u een EventHubsConfwilt maken, moet u een verbindingsreeks doorgeven:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Zie Verbinding maken ion String voor meer informatie over het verkrijgen van een geldige verbindingsreeks.

Zie EventHubsConf voor een volledige lijst met configuraties. Hier volgt een subset van configuraties om u op weg te helpen:

Optie Weergegeven als Standaard Querytype Beschrijving
consumerGroup String "$Default" Streaming en batch Een consumentengroep is een weergave van een hele Event Hub. Consumergroepen maken het mogelijk dat meerdere consumerende toepassingen beschikken over een afzonderlijke weergave van de gebeurtenisstroom. De toepassingen kunnen de stroom onafhankelijk, in hun eigen tempo en met hun eigen offsets lezen. Meer informatie vindt u in de Microsoft-documentatie.
startingPosition EventPosition Begin van de stream Streaming en batch De beginpositie voor uw Structured Streaming-taak. Zie startingPositions voor informatie over de volgorde waarin de opties worden gelezen.
maxEventsPerTrigger long partitionCount

* 1000
Streamingquery Frequentielimiet voor het maximum aantal gebeurtenissen dat per triggerinterval wordt verwerkt. Het opgegeven totale aantal gebeurtenissen wordt proportioneel verdeeld over partities van verschillende volumes.

Voor elke optie bestaat er een bijbehorende instelling in EventHubsConf. Voorbeeld:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf staat gebruikers toe om beginposities (en eindposities) met de EventPosition klasse op te geven. EventPosition definieert de positie van een gebeurtenis in een Event Hub-partitie. De positie kan een enqueued tijd, offset, volgnummer, het begin van de stream of het einde van de stream zijn.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Als u wilt beginnen (of eindigen) op een specifieke positie, maakt u gewoon de juiste EventPosition en stelt u deze in uw EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Productie gestructureerd streamen met Azure Event Hubs

Wanneer u streamingquery's uitvoert in productie, wilt u waarschijnlijk meer robuustheid en uptimegaranties dan wanneer u gewoon een notebook aan een cluster koppelt en uw streamingquery's interactief uitvoert. Importeer en voer het volgende notebook uit voor een demonstratie van het configureren en uitvoeren van Structured Streaming in productie met Azure Event Hubs en Azure Databricks.

Zie Productieoverwegingen voor Gestructureerd streamen voor meer informatie.

Gestructureerd streamen van productie met Azure Event Hubs-notebook

Notebook downloaden