Azure Event Hubs

Azure Event Hubs est un service d’ingestion de données de télémétrie à très grande échelle qui collecte, transforme et stocke des millions d’événements. En tant que plateforme de streaming distribuée, ce service propose une faible latence et une durée de conservation configurable, ce qui vous permet d’ingérer des quantités massives de données de télémétrie dans le cloud et de lire les données de plusieurs applications en utilisant une sémantique publication-abonnement.

Cet article explique comment utiliser une diffusion en continu structurée avec des clusters Azure Event Hubs et Azure Databricks.

Remarque

Azure Event Hubs fournit un point de terminaison compatible avec Apache Kafka que vous pouvez utiliser avec le connecteur flux structuré Kafka, disponible dans Databricks Runtime, pour traiter les messages provenant de Azure Event Hubs. Databricks recommande d’utiliser le connecteur Kafka Structured Streaming pour traiter les messages provenant de Azure Event Hubs.

Spécifications

Pour la prise en charge de la version actuelle, consultez « Latest Releases » dans le fichier readme du projet de connecteur Spark pour Azure Event Hubs.

  1. Créez une bibliothèque dans votre espace de travail Azure Databricks à l’aide de la coordonnée Maven com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    Notes

    Ce connecteur étant mis à jour régulièrement, il se peut qu’une version plus récente soit disponible. Nous vous recommandons d’extraire le dernier connecteur du référentiel Maven

  2. Installez la bibliothèque créée dans votre cluster.

schéma

Le schéma des enregistrements est le suivant :

Colonne Type
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

Le body est toujours fourni sous le forme d’un tableau d’octets. Utilisez cast("string") pour désérialiser explicitement la colonne body.

Quick Start

Commençons par un exemple rapide : WordCount. Le notebook suivant est tout ce qu’il faut pour exécuter WordCount à l’aide d’une diffusion en continu structurée avec Azure Event Hubs.

WordCount pour Azure Event Hubs avec un notebook de diffusion en continu structurée

Obtenir le notebook

Configuration

Cette section décrit les paramètres de configuration que vous devez utiliser avec Event Hubs.

Pour obtenir des instructions détaillées sur la configuration de la diffusion en continu structurée avec Azure Event Hubs, consultez le Guide d’intégration de la diffusion en continu structurée et d’Azure Event Hubs élaboré par Microsoft.

Pour obtenir des instructions détaillées sur l’utilisation de Structured Streaming, consultez Diffusion sur Azure Databricks.

Chaîne de connexion

Une chaîne de connexion Event Hubs est requise pour se connecter au service Event Hubs. Vous pouvez obtenir la chaîne de connexion pour votre instance Event Hubs à partir du portail Azure ou en utilisant le ConnectionStringBuilder dans la bibliothèque.

Portail Azure

Lorsque vous récupérez la chaîne de connexion à partir du portail Azure, elle peut avoir ou non la clé EntityPath. Vous devez :

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Pour que vous puissiez vous connecter à votre Event Hubs, un EntityPath doit être présent. Si votre chaîne de connexion n’en a pas, ne vous inquiétez pas. La ressource suivante s’en occupera :

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Vous pouvez également utiliser le ConnectionStringBuilder pour créer votre chaîne de connexion.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Toute la configuration relative à Event Hubs se produit dans votre EventHubsConf. Pour créer un EventHubsConf, vous devez passer une chaîne de connexion :

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Pour plus d’informations sur l’obtention d’une chaîne de connexion valide, consultez Chaîne de connexion.

Pour obtenir la liste complète des configurations, consultez EventHubsConf. Voici un sous-ensemble de configurations pour commencer :

Option Valeur Default Type de requête Description
consumerGroup String “$Default” Diffusion en continu et traitement par lots Un groupe de consommateurs est un affichage d’un hub d’événements entier. Les groupes de consommateurs permettent à plusieurs applications consommatrices d'avoir chacune une vue distincte du flux d'événements et de lire le flux indépendamment à leur propre rythme et avec leurs propres décalages. Pour plus d’informations, consultez la documentation Microsoft.
startingPosition EventPosition Début du flux Diffusion en continu et traitement par lots Position de départ de votre travail de diffusion en continu structurée. Pour plus d’informations sur l’ordre dans lequel les options sont lues, consultez startingPositions.
maxEventsPerTrigger long partitionCount

* 1000
Requête de diffusion en continu Limite de débit sur le nombre maximal d’événements traités par intervalle de déclencheur. Le nombre total d’événements spécifié sera réparti de manière proportionnelle entre des partitions de volume différent.

Pour chaque option, il existe un paramètre correspondant dans EventHubsConf. Par exemple :

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf permet aux utilisateurs de spécifier des positions de début (et de fin) avec la classe EventPosition. EventPosition définit la position d’un événement dans une partition de hub d’événements. La position peut être une heure mise en file d’attente, un décalage, un numéro de séquence, le début du flux ou la fin du flux.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Si vous souhaitez commencer (ou finir) à un emplacement spécifique, créez simplement la EventPosition correcte et définissez-la dans votre EventHubsConf :

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Diffusion en continu structurée en production avec Azure Event Hubs

Lorsque vous exécutez des requêtes de diffusion en continu en production, vous souhaitez probablement plus de garanties en terme de robustesse et de durée de bon fonctionnement que vous n’en auriez en attachant simplement un notebook à un cluster et en exécutant vos requêtes de diffusion en continu de manière interactive. Importez et exécutez le notebook suivant pour une démonstration de la manière de configurer et d’exécuter une diffusion en continu structurée en production avec Azure Event Hubs et Azure Databricks.

Pour plus d’informations, consultez Considérations relatives à la production pour les applications Structured Streaming.

Diffusion en continu structurée en production avec un notebook Azure Event Hubs

Obtenir le notebook