Azure Event Hubs
Azure Event Hubs är en telemetriinmatningstjänst i hyperskala som samlar in, transformerar och lagrar miljontals händelser. Som en distribuerad strömningsplattform tillhandahåller tjänsten korta svarstider och konfigurerbar kvarhållning så att du kan föra in enorma mängder telemetridata i molnet och läsa data från flera program med hjälp av ”publicera-prenumerera”-semantik.
Den här artikeln beskriver hur du använder Strukturerad direktuppspelning med Azure Event Hubs- och Azure Databricks-kluster.
Kommentar
Azure Event Hubs tillhandahåller en slutpunkt som är kompatibel med Apache Kafka som du kan använda med Kafka-anslutningsappen för strukturerad direktuppspelning, tillgänglig i Databricks Runtime, för att bearbeta meddelanden från Azure Event Hubs. Databricks rekommenderar att du använder Kafka-anslutningsappen för strukturerad direktuppspelning för att bearbeta meddelanden från Azure Event Hubs.
Behov
Aktuellt versionsstöd finns i "Senaste versioner" i läsfilen för Azure Event Hubs Spark Anslut eller projekt.
Skapa ett bibliotek på din Azure Databricks-arbetsyta med hjälp av Maven-koordinaten
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
.Kommentar
Den här anslutningsappen uppdateras regelbundet och en senare version kan vara tillgänglig: vi rekommenderar att du hämtar den senaste anslutningsappen från Maven-lagringsplatsen
Installera det skapade biblioteket i klustret.
Schema
Schemat för posterna är:
Column | Typ |
---|---|
body |
binary |
partition |
sträng |
offset |
sträng |
sequenceNumber |
lång |
enqueuedTime |
timestamp |
publisher |
sträng |
partitionKey |
sträng |
properties |
map[string,json] |
body
tillhandahålls alltid som en bytematris. Använd cast("string")
för att explicit deserialisera body
kolumnen.
Snabbstart
Vi börjar med ett snabbt exempel: WordCount. Följande notebook-fil är allt som krävs för att köra WordCount med structured streaming med Azure Event Hubs.
Azure Event Hubs WordCount med notebook-fil för strukturerad direktuppspelning
Konfiguration
I det här avsnittet beskrivs de konfigurationsinställningar som du behöver för att arbeta med Event Hubs.
Detaljerad vägledning om hur du konfigurerar strukturerad direktuppspelning med Azure Event Hubs finns i integrationsguiden för strukturerad direktuppspelning och Azure Event Hubs som utvecklats av Microsoft.
Detaljerad vägledning om hur du använder strukturerad direktuppspelning finns i Direktuppspelning på Azure Databricks.
Connection string
En Event Hubs-anslutningssträng krävs för att ansluta till Event Hubs-tjänsten. Du kan hämta anslutningssträng för din Event Hubs-instans från Azure-portalen eller med hjälp ConnectionStringBuilder
av i biblioteket.
Azure Portal
När du får anslutningssträng från Azure-portalen kanske den har nyckeln eller kanske inte har EntityPath
nyckeln. Tänk på att:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
För att ansluta till dina EventHubs måste en EntityPath
finnas. Om din anslutningssträng inte har en, oroa dig inte.
Detta tar hand om det:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
Anslut ionStringBuilder
Du kan också använda ConnectionStringBuilder
för att göra dina anslutningssträng.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
All konfiguration som rör Event Hubs sker i din EventHubsConf
. Om du vill skapa en EventHubsConf
måste du skicka en anslutningssträng:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Mer information om hur du hämtar en giltig anslutningssträng finns i Anslut ion String.
En fullständig lista över konfigurationer finns i EventHubsConf. Här är en delmängd konfigurationer för att komma igång:
Alternativ | Värde | Default | Frågetyp | beskrivning |
---|---|---|---|---|
consumerGroup |
String | "$Default" | Direktuppspelning och batch | En konsumentgrupp är en vy över en hel händelsehubb. Konsumentgrupper gör det möjligt för flera användningsprogram att vart och ett ha en separat vy över händelseströmmen och att oberoende läsa strömmen i egen takt och med sina egna offset. Mer information finns i Microsoft-dokumentationen. |
startingPosition |
EventPosition | Start av dataström | Direktuppspelning och batch | Startpositionen för ditt strukturerade direktuppspelningsjobb. Se startingPositions för information om i vilken ordning alternativen läse. |
maxEventsPerTrigger |
lång | partitionCount * 1000 |
Direktuppspelningsfråga | Hastighetsgräns för maximalt antal händelser som bearbetas per utlösarintervall. Det angivna totala antalet händelser delas proportionellt mellan partitioner av olika volymer. |
För varje alternativ finns det en motsvarande inställning i EventHubsConf
. Till exempel:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
tillåter användare att ange inledande (och avslutande) positioner med EventPosition
klassen. EventPosition
definierar positionen för en händelse i en Event Hub-partition. Positionen kan vara en köad tid, förskjutning, sekvensnummer, strömmens start eller strömmens slut.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Om du vill starta (eller avsluta) vid en viss position skapar du helt enkelt rätt EventPosition
och ställer in den i :EventHubsConf
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
Produktionsstrukturerad direktuppspelning med Azure Event Hubs
När du kör strömmande frågor i produktion vill du förmodligen ha mer robusthet och drifttidsgarantier än du skulle ha när du bara kopplar en notebook-fil till ett kluster och kör dina strömmande frågor interaktivt. Importera och kör följande notebook-fil för en demonstration av hur du konfigurerar och kör Strukturerad direktuppspelning i produktion med Azure Event Hubs och Azure Databricks.
Mer information finns i Produktionsöverväganden för strukturerad direktuppspelning.