Hubs de eventos do Azure

Os Hubs de Eventos do Azure são um serviço de ingestão de telemetria de hiperescala que coleta, transforma e armazena milhões de eventos. Como uma plataforma de streaming distribuída, ele oferece baixa latência e retenção de tempo configurável, permitindo que você ingresse grandes quantidades de telemetria na nuvem e leia os dados de diversos aplicativos usando semântica de publicação/assinatura.

Este artigo explica como usar o Fluxo Estruturado com clusters dos Hubs de Eventos do Azure e dados do Azure Databricks.

Observação

Os Hubs de Eventos do Azure fornecem um ponto de extremidade compatível com o Apache Kafka que você pode usar com o Conector Kafka de streaming estruturado, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. O Databricks recomenda usar o conector Kafka de fluxo estruturado para processar mensagens de Hubs de Eventos do Azure.

Requisitos

Para suporte à versão atual, consulte "Versões mais recentes" no arquivo leia-me do projeto Hubs de Eventos do Azure Spark Connector.

  1. Crie uma biblioteca em seu workspace no Azure Databricks usando a coordenada Maven com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    Observação

    Esse conector é atualizado regularmente, e uma versão mais recente pode estar disponível: recomendamos que você busque o conector mais recente no repositório do Maven

  2. Instale a biblioteca criada em seu cluster.

Esquema

O esquema dos registros é:

Coluna Type
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

O body é sempre fornecido como uma matriz de byte. Use cast("string") para desserializar explicitamente a coluna body.

Início rápido

Vamos começar com um exemplo rápido: WordCount. O notebook a seguir é tudo o que é necessário para executar o WordCount usando o Fluxo Estruturado com Hubs de Eventos do Azure.

Hubs de Eventos do Azure WordCount com notebook de Fluxo Estruturado

Obter notebook

Configuração

Esta seção discute as definições de configuração que você precisa para trabalhar com os Hubs de Eventos.

Para obter diretrizes detalhadas sobre como configurar o Fluxo Estruturado com Hubs de Eventos do Azure, consulte o Guia de Integração dos Hubs de Eventos do Azure e Fluxo Estruturado desenvolvido pela Microsoft.

Para obter diretrizes detalhadas sobre como usar o Fluxo Estruturado, confira Fluxo Estruturado no Azure Databricks.

Cadeia de conexão

Uma cadeia de conexão dos Hubs de Eventos é necessária para se conectar ao serviço hubs de eventos. Você pode obter a cadeia de conexão para sua instância dos Hubs de Eventos do portal do Azure ou usando o ConnectionStringBuilder na biblioteca.

Portal do Azure

Quando você obter a cadeia de conexão do portal do Azure, ela poderá ou não ter a chave EntityPath. Considerar:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Para se conectar aos seus EventHubs, um EntityPath deve estar presente. Se a cadeia de conexão não tiver uma, não se preocupe. Isso tomará conta dele:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Como alternativa, você pode usar o ConnectionStringBuilder para fazer sua cadeia de conexão.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Todas as configurações relacionadas aos Hubs de Eventos ocorrem em seu EventHubsConf. Para criar um EventHubsConf, você deve passar uma cadeia de conexão:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Consulte Cadeia de conexão para obter mais informações sobre como obter uma cadeia de conexão válida.

Para ver uma lista completa de configurações, consulte EventHubsConf. Aqui está um subconjunto de configurações para você começar:

Opção Valor Padrão Tipo de consulta Descrição
consumerGroup String “$Default” Streaming e lote Um grupo de consumidores é uma exibição de um hub de eventos inteiro. Os grupos de consumidores permitem que vários aplicativos de consumo tenham um modo de exibição separado do fluxo de eventos e leiam o fluxo de forma independente em seu próprio ritmo e com seus próprios deslocamentos. Mais informações estão disponíveis na documentação da Microsoft.
startingPosition EventPosition Início da transmissão Streaming e lote A posição inicial para seu trabalho de Fluxo Estruturado. Consulte startingPositions para obter informações sobre a ordem em que as opções são lidas.
maxEventsPerTrigger long partitionCount

* 1000
Consulta de streaming Limite de taxa no número máximo de eventos processados por intervalo de gatilho. O número total de eventos especificado será dividido proporcionalmente entre partições de volume diferente.

Para cada opção, existe uma configuração correspondente em EventHubsConf. Por exemplo:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf permite que os usuários especifiquem posições inicial (e final) com a classe EventPosition. EventPosition define a posição de um evento em uma partição do Hub de Eventos. A posição pode ser um tempo, deslocamento, número de sequência, o início do fluxo ou o final do fluxo.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Se você quiser iniciar (ou terminar) em uma posição específica, basta criar o EventPosition correto e defini-lo em seu EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Fluxo estruturado de produção com Hubs de Eventos do Azure

Quando você executar consultas de streaming em produção, provavelmente deseja mais robustez e garantias de tempo de atividade do que teria quando simplesmente anexar um notebook a um cluster e executar suas consultas de streaming interativamente. Importe e execute o notebook a seguir para uma demonstração de como configurar e executar o Fluxo Estruturado em produção com Hubs de Eventos do Azure e Azure Databricks.

Para mais informações, confira Considerações de produção para Fluxo estruturado.

Fluxo estruturado de produção com notebook dos Hubs de Eventos do Azure

Obter notebook