Azure Event Hubs

Azure Event Hubs es un servicio de ingesta de datos de telemetría a hiperescala que recopila, transforma y almacena millones de eventos. Como plataforma de streaming distribuida, ofrece retención de tiempo configurable y baja latencia, lo que permite ingresar grandes cantidades de datos de telemetría en la nube y leer los datos en varias aplicaciones mediante semántica de publicación o suscripción.

En este artículo se explica cómo usar el streaming estructurado con Azure Event Hubs y clústeres de Azure Databricks.

Nota:

Azure Event Hubs proporciona un punto de conexión compatible con Apache Kafka que puede usar con el conector de Kafka de Structured Streaming, disponible en Databricks Runtime, para procesar mensajes de Azure Event Hubs. Databricks recomienda usar el conector de Kafka de Structured Streaming para procesar mensajes de Azure Event Hubs.

Requisitos

Para compatibilidad con la versión actual, consulte "Versiones más recientes" en el archivo léame del proyecto del conector de Spark para Azure Event Hubs.

  1. Cree una biblioteca en el área de trabajo de Azure Databricks mediante la coordenada com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 de Maven.

    Nota:

    El conector se actualiza con regularidad y puede haber disponible una versión más reciente: le recomendamos que extraiga el conector más reciente del repositorio de Maven.

  2. Instale la biblioteca creada en el clúster.

Schema

El esquema de los registros es:

Columna Tipo
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

body siempre se proporciona como matriz de bytes. Use cast("string") para deserializar explícitamente la columna body.

Inicio rápido

Comentemos con un ejemplo rápido: WordCount. El siguiente cuaderno es todo lo que se necesita para ejecutar WordCount mediante el streaming estructurado con Azure Event Hubs.

WordCount de Azure Event Hubs con un cuaderno de streaming estructurado.

Obtener el cuaderno

Configuración

En esta sección se explican las opciones de configuración necesarias para trabajar con Event Hubs.

Para instrucciones detalladas sobre cómo configurar el streaming estructurado con Azure Event Hubs, consulte la guía de integración del streaming estructurado y Azure Event Hubs desarrollada por Microsoft.

Para obtener instrucciones detalladas sobre el uso de Structured Streaming, consulte Streaming en Azure Databricks.

Cadena de conexión

Se necesita una cadena de conexión de Event Hubs para la conexión al servicio Event Hubs. Puede obtenerla para su instancia de Event Hubs en Azure Portal o si usa ConnectionStringBuilder en la biblioteca.

Azure portal

Si obtiene la cadena de conexión de Azure Portal, la clave de EntityPath puede que no esté. Considere:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Para conectarse a Event Hubs, debe haber EntityPath. Si la cadena de conexión no lo tiene, no se preocupe. Con esto se consigue:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Como alternativa, puede usar ConnectionStringBuilder para crear la cadena de conexión.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Toda la configuración relativa a Event Hubs se produce en EventHubsConf. Para crear EventHubsConf, debe pasar una cadena de conexión:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Consulte Cadena de conexión para más información sobre cómo obtener una cadena de conexión válida.

Para una lista exhaustiva de las configuraciones, consulte EventHubsConf. Este es un subconjunto de configuraciones para empezar:

Opción Value Valor predeterminado Tipo de consulta Descripción
consumerGroup String “$Default” Streaming y lote Un grupo de consumidores es una vista de un centro de eventos completo. Los grupos de consumidores habilitan varias aplicaciones consumidoras para que cada una tenga una vista separada del flujo de eventos y para que lean el flujo de forma independiente a su propio ritmo y con sus propios desplazamientos. Más información disponible en la documentación de Microsoft.
startingPosition EventPosition Inicio de la secuencia Streaming y lote Posición inicial del trabajo del streaming estructurado. Consulte startingPositions para información sobre el orden de lectura de las opciones.
maxEventsPerTrigger long partitionCount

* 1000
Consulta de streaming Número máximo de eventos que se procesan por intervalo del desencadenador. El número total especificado de eventos se dividirá proporcionalmente entre las particiones de volumen distinto.

Para cada opción, existe una configuración correspondiente en EventHubsConf. Por ejemplo:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf permite a los usuarios especificar posiciones iniciales (y finales) con la clase EventPosition. EventPosition define la posición de un evento en una partición del centro de eventos. La posición puede ser un tiempo de puesta en cola, un desplazamiento, un número de secuencia, o el inicio o el final de la transmisión.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Si desea iniciar (o finalizar) en una posición específica, basta con crear el EventPosition correcto y establecerlo en EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Streaming estructurado en producción con Azure Event Hubs

Al ejecutar consultas de streaming en producción, es probable que desee más garantías de tiempo de actividad y solidez que las que tendría al simplemente adjuntar un cuaderno a un clúster y ejecutar las consultas de streaming de forma interactiva. Importe y ejecute el siguiente cuaderno para una demostración de cómo configurar y ejecutar el streaming estructurado en producción con Azure Event Hubs y Azure Databricks.

Para más información, consulte Consideraciones de producción para Structured Streaming.

El streaming estructurado en producción con un cuaderno de Azure Event Hubs

Obtener el cuaderno