Azure Event Hubs

Azure Event Hubs är en telemetriinmatningstjänst i hyperskala som samlar in, transformerar och lagrar miljontals händelser. Som en distribuerad strömningsplattform tillhandahåller tjänsten korta svarstider och konfigurerbar kvarhållning så att du kan föra in enorma mängder telemetridata i molnet och läsa data från flera program med hjälp av ”publicera-prenumerera”-semantik.

Den här artikeln beskriver hur du använder Strukturerad direktuppspelning med Azure Event Hubs- och Azure Databricks-kluster.

Kommentar

Azure Event Hubs tillhandahåller en slutpunkt som är kompatibel med Apache Kafka som du kan använda med Kafka-anslutningsappen för strukturerad direktuppspelning, tillgänglig i Databricks Runtime, för att bearbeta meddelanden från Azure Event Hubs. Databricks rekommenderar att du använder Kafka-anslutningsappen för strukturerad direktuppspelning för att bearbeta meddelanden från Azure Event Hubs.

Behov

Aktuellt versionsstöd finns i "Senaste versioner" i läsfilen för Azure Event Hubs Spark Anslut eller projekt.

  1. Skapa ett bibliotek på din Azure Databricks-arbetsyta med hjälp av Maven-koordinaten com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    Kommentar

    Den här anslutningsappen uppdateras regelbundet och en senare version kan vara tillgänglig: vi rekommenderar att du hämtar den senaste anslutningsappen från Maven-lagringsplatsen

  2. Installera det skapade biblioteket i klustret.

Schema

Schemat för posterna är:

Column Typ
body binary
partition sträng
offset sträng
sequenceNumber lång
enqueuedTime timestamp
publisher sträng
partitionKey sträng
properties map[string,json]

body tillhandahålls alltid som en bytematris. Använd cast("string") för att explicit deserialisera body kolumnen.

Snabbstart

Vi börjar med ett snabbt exempel: WordCount. Följande notebook-fil är allt som krävs för att köra WordCount med structured streaming med Azure Event Hubs.

Azure Event Hubs WordCount med notebook-fil för strukturerad direktuppspelning

Hämta notebook-fil

Konfiguration

I det här avsnittet beskrivs de konfigurationsinställningar som du behöver för att arbeta med Event Hubs.

Detaljerad vägledning om hur du konfigurerar strukturerad direktuppspelning med Azure Event Hubs finns i integrationsguiden för strukturerad direktuppspelning och Azure Event Hubs som utvecklats av Microsoft.

Detaljerad vägledning om hur du använder strukturerad direktuppspelning finns i Direktuppspelning på Azure Databricks.

Connection string

En Event Hubs-anslutningssträng krävs för att ansluta till Event Hubs-tjänsten. Du kan hämta anslutningssträng för din Event Hubs-instans från Azure-portalen eller med hjälp ConnectionStringBuilder av i biblioteket.

Azure Portal

När du får anslutningssträng från Azure-portalen kanske den har nyckeln eller kanske inte har EntityPath nyckeln. Tänk på att:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

För att ansluta till dina EventHubs måste en EntityPath finnas. Om din anslutningssträng inte har en, oroa dig inte. Detta tar hand om det:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

Anslut ionStringBuilder

Du kan också använda ConnectionStringBuilder för att göra dina anslutningssträng.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

All konfiguration som rör Event Hubs sker i din EventHubsConf. Om du vill skapa en EventHubsConfmåste du skicka en anslutningssträng:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Mer information om hur du hämtar en giltig anslutningssträng finns i Anslut ion String.

En fullständig lista över konfigurationer finns i EventHubsConf. Här är en delmängd konfigurationer för att komma igång:

Alternativ Värde Default Frågetyp beskrivning
consumerGroup String "$Default" Direktuppspelning och batch En konsumentgrupp är en vy över en hel händelsehubb. Konsumentgrupper gör det möjligt för flera användningsprogram att vart och ett ha en separat vy över händelseströmmen och att oberoende läsa strömmen i egen takt och med sina egna offset. Mer information finns i Microsoft-dokumentationen.
startingPosition EventPosition Start av dataström Direktuppspelning och batch Startpositionen för ditt strukturerade direktuppspelningsjobb. Se startingPositions för information om i vilken ordning alternativen läse.
maxEventsPerTrigger lång partitionCount

* 1000
Direktuppspelningsfråga Hastighetsgräns för maximalt antal händelser som bearbetas per utlösarintervall. Det angivna totala antalet händelser delas proportionellt mellan partitioner av olika volymer.

För varje alternativ finns det en motsvarande inställning i EventHubsConf. Till exempel:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf tillåter användare att ange inledande (och avslutande) positioner med EventPosition klassen. EventPosition definierar positionen för en händelse i en Event Hub-partition. Positionen kan vara en köad tid, förskjutning, sekvensnummer, strömmens start eller strömmens slut.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Om du vill starta (eller avsluta) vid en viss position skapar du helt enkelt rätt EventPosition och ställer in den i :EventHubsConf

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Produktionsstrukturerad direktuppspelning med Azure Event Hubs

När du kör strömmande frågor i produktion vill du förmodligen ha mer robusthet och drifttidsgarantier än du skulle ha när du bara kopplar en notebook-fil till ett kluster och kör dina strömmande frågor interaktivt. Importera och kör följande notebook-fil för en demonstration av hur du konfigurerar och kör Strukturerad direktuppspelning i produktion med Azure Event Hubs och Azure Databricks.

Mer information finns i Produktionsöverväganden för strukturerad direktuppspelning.

Produktionsstrukturerad direktuppspelning med Azure Event Hubs Notebook

Hämta notebook-fil