Processamento de fluxo com Apache Kafka e Azure Databricks

Este artigo descreve como você pode usar o Apache Kafka como uma fonte ou um coletor ao executar cargas de trabalho de Streaming Estruturado no Azure Databricks.

Para mais informações sobre Kafka, consulte a documentação de Kafka.

Ler dados de Kafka

A seguir está um exemplo para uma leitura de streaming de Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

O Azure Databricks também dá suporte à semântica de leitura em lote para fontes de dados Kafka, conforme mostrado no exemplo a seguir:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Para carregamento incremental em lote, o Databricks recomenda o uso do Kafka com Trigger.AvailableNowo . Consulte Configurando o processamento incremental em lote.

No Databricks Runtime 13.3 LTS e superior, o Azure Databricks fornece uma função SQL para ler dados Kafka. O streaming com SQL é suportado apenas em Delta Live Tables ou com tabelas de streaming em Databricks SQL. Consulte read_kafka função com valor de tabela.

Configurar o leitor Kafka Structured Streaming

O Azure Databricks fornece a kafka palavra-chave como um formato de dados para configurar conexões com o Kafka 0.10+.

A seguir estão as configurações mais comuns para Kafka:

Há várias maneiras de especificar quais tópicos assinar. Você deve fornecer apenas um destes parâmetros:

Opção valor Description
subscrever Uma lista de tópicos separados por vírgula. A lista de tópicos para se inscrever.
subscribePattern Cadeia de caracteres regex Java. O padrão usado para se inscrever no(s) tópico(s).
atribuir Cadeia de caracteres {"topicA":[0,1],"topic":[2,4]}JSON . Tópico específicoPartições a consumir.

Outras configurações notáveis:

Opção Value Valor Predefinido Description
kafka.bootstrap.servidores Lista separada por vírgulas de host:port. empty [Obrigatório] A configuração de Kafka bootstrap.servers . Se você achar que não há dados de Kafka, verifique a lista de endereços do corretor primeiro. Se a lista de endereços do corretor estiver incorreta, pode não haver erros. Isso ocorre porque o cliente Kafka assume que os corretores ficarão disponíveis eventualmente e, no caso de erros de rede, tente novamente para sempre.
failOnDataLoss true ou false. true [Opcional] Se a consulta deve falhar quando é possível que os dados tenham sido perdidos. As consultas podem falhar permanentemente ao ler dados de Kafka devido a muitos cenários, como tópicos excluídos, truncamento de tópico antes do processamento e assim por diante. Tentamos estimar de forma conservadora se os dados foram possivelmente perdidos ou não. Às vezes, isso pode causar falsos alarmes. Defina essa opção como false se ela não funcionar conforme o esperado ou se você quiser que a consulta continue processando apesar da perda de dados.
minPartições Inteiro >= 0, 0 = desativado. 0 (desativado) [Opcional] Número mínimo de partições para ler a partir de Kafka. Você pode configurar o Spark para usar um mínimo arbitrário de partições para ler do Kafka usando a minPartitions opção. Normalmente, o Spark tem um mapeamento 1-1 de Kafka topicPartitions para partições Spark consumindo de Kafka. Se você definir a opção para um valor maior do que o seu tópico KafkaPartitions, o minPartitions Spark dividirá partições Kafka grandes em partes menores. Essa opção pode ser definida em momentos de pico de cargas, distorção de dados e à medida que seu fluxo está ficando para trás para aumentar a taxa de processamento. Ele tem um custo de inicialização de consumidores Kafka em cada gatilho, o que pode afetar o desempenho se você usar SSL ao se conectar ao Kafka.
kafka.group.id Um ID de grupo de consumidores Kafka. não definido [Opcional] ID de grupo para usar durante a leitura de Kafka. Utilize isto com precaução. Por padrão, cada consulta gera um ID de grupo exclusivo para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores que não enfrente interferência de nenhum outro consumidor e, portanto, possa ler todas as partições de seus tópicos inscritos. Em alguns cenários (por exemplo, autorização baseada em grupo Kafka), convém usar IDs de grupo autorizadas específicas para ler dados. Opcionalmente, você pode definir o ID do grupo. No entanto, faça isso com extrema cautela, pois pode causar um comportamento inesperado.

* Consultas em execução simultânea (ambos, lote e streaming) com o mesmo ID de grupo provavelmente interferem umas com as outras, fazendo com que cada consulta leia apenas parte dos dados.
* Isso também pode ocorrer quando as consultas são iniciadas/reiniciadas em rápida sucessão. Para minimizar esses problemas, defina a configuração session.timeout.ms do consumidor Kafka como muito pequena.
iniciandoOffsets mais cedo , mais recente mais recente [Opcional] O ponto inicial quando uma consulta é iniciada, seja "mais cedo", que é dos primeiros deslocamentos, ou uma cadeia de caracteres json especificando um deslocamento inicial para cada TopicPartition. No json, -2 como um deslocamento pode ser usado para se referir ao mais antigo, -1 ao mais recente. Nota: Para consultas em lote, o mais recente (implicitamente ou usando -1 em json) não é permitido. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada, e essa retomada sempre será retomada de onde a consulta parou. As partições recém-descobertas durante uma consulta começarão no mínimo.

Consulte o Guia de Integração do Kafka de Streaming Estruturado para obter outras configurações opcionais.

Esquema para registros Kafka

O esquema dos registros de Kafka é:

Column Tipo
key binário
valor binário
topic string
partição número inteiro
offset long
carimbo de data/hora long
timestampType número inteiro

O key e o value são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer. Use operações DataFrame (como cast("string")) para desserializar explicitamente as chaves e valores.

Gravar dados em Kafka

Segue-se um exemplo de uma gravação de streaming para Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

O Azure Databricks também dá suporte à semântica de gravação em lote para coletores de dados Kafka, conforme mostrado no exemplo a seguir:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Configurar o gravador de Streaming Estruturado Kafka

Importante

O Databricks Runtime 13.3 LTS e superior inclui uma versão mais recente da kafka-clients biblioteca que permite gravações idempotentes por padrão. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitadas, a gravação falhará com a mensagem org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error statede erro .

Resolva esse erro atualizando para Kafka versão 2.8.0 ou superior, ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de Streaming Estruturado.

O esquema fornecido ao DataStreamWriter interage com o coletor Kafka. Você pode usar os seguintes campos:

Nome da coluna Obrigatório ou opcional Type
key opcional STRING ou BINARY
value obrigatório STRING ou BINARY
headers opcional ARRAY
topic opcional (ignorado se topic estiver definido como opção de gravador) STRING
partition opcional INT

A seguir estão as opções comuns definidas ao escrever para Kafka:

Opção Value Valor predefinido Description
kafka.boostrap.servers Uma lista separada por vírgulas de <host:port> nenhum [Obrigatório] A configuração de Kafka bootstrap.servers .
topic STRING não definido [Opcional] Define o tópico para todas as linhas a serem escritas. Esta opção substitui qualquer coluna de tópico que exista nos dados.
includeHeaders BOOLEAN false [Opcional] Se os cabeçalhos de Kafka devem ser incluídos na linha.

Consulte o Guia de Integração do Kafka de Streaming Estruturado para obter outras configurações opcionais.

Recuperar métricas de Kafka

Você pode obter a média, o mínimo e o máximo do número de compensações que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos com o avgOffsetsBehindLatest, maxOffsetsBehindLateste minOffsetsBehindLatest métricas. Consulte Leitura de métricas interativamente.

Nota

Disponível no Databricks Runtime 9.1 e superior.

Obtenha o número total estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos examinando o valor de estimatedTotalBytesBehindLatest. Esta estimativa baseia-se nos lotes que foram processados nos últimos 300 segundos. O período de tempo em que a estimativa se baseia pode ser alterado definindo a opção bytesEstimateWindowLength para um valor diferente. Por exemplo, para defini-lo como 10 minutos:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se você estiver executando o fluxo em um bloco de anotações, poderá ver essas métricas na guia Dados brutos no painel de progresso da consulta de streaming:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Usar SSL para conectar o Azure Databricks ao Kafka

Para habilitar conexões SSL com Kafka, siga as instruções na documentação Confluent Encryption and Authentication with SSL. Você pode fornecer as configurações descritas lá, prefixadas com kafka., como opções. Por exemplo, você especifica o local de armazenamento confiável na propriedade kafka.ssl.truststore.location.

A Databricks recomenda que você:

O exemplo a seguir usa locais de armazenamento de objetos e segredos do Databricks para habilitar uma conexão SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Conectar o Kafka no HDInsight ao Azure Databricks

  1. Crie um cluster HDInsight Kafka.

    Consulte Conectar-se ao Kafka no HDInsight por meio de uma Rede Virtual do Azure para obter instruções.

  2. Configure os corretores Kafka para anunciar o endereço correto.

    Siga as instruções em Configurar Kafka para publicidade IP. Se você mesmo gerencia o Kafka nas Máquinas Virtuais do Azure, verifique se a advertised.listeners configuração dos brokers está definida como o IP interno dos hosts.

  3. Crie um cluster do Azure Databricks.

  4. Emparelhe o cluster Kafka ao cluster do Azure Databricks.

    Siga as instruções em Redes virtuais de mesmo nível.

Autenticação da entidade de serviço com o Microsoft Entra ID (anteriormente Azure Ative Directory) e Hubs de Eventos do Azure

O Azure Databricks dá suporte à autenticação de trabalhos do Spark com serviços de Hubs de Eventos. Essa autenticação é feita via OAuth com o Microsoft Entra ID (anteriormente Azure Ative Directory).

Diagrama de autenticação do AAD

O Azure Databricks dá suporte à autenticação de ID do Microsoft Entra com uma ID de cliente e segredo nos seguintes ambientes de computação:

  • Databricks Runtime 12.2 LTS e superior em computação configurada com modo de acesso de usuário único.
  • Databricks Runtime 14.3 LTS e superior em computação configurada com modo de acesso compartilhado.
  • Pipelines Delta Live Tables configurados sem o Unity Catalog.

O Azure Databricks não oferece suporte à autenticação de ID do Microsoft Entra com um certificado em qualquer ambiente de computação ou em pipelines Delta Live Tables configurados com o Unity Catalog.

Essa autenticação não funciona em clusters compartilhados ou em Unity Catalog Delta Live Tables.

Configurando o conector Kafka de streaming estruturado

Para executar a autenticação com o Microsoft Entra ID, você precisará dos seguintes valores:

  • Um ID de locatário. Você pode encontrar isso na guia Serviços do Microsoft Entra ID .

  • Um clientID (também conhecido como ID do aplicativo).

  • Um segredo do cliente. Depois de ter isso, você deve adicioná-lo como um segredo ao seu espaço de trabalho Databricks. Para adicionar esse segredo, consulte Gerenciamento secreto.

  • Um tópico do EventHubs. Você pode encontrar uma lista de tópicos na seção Hubs de Eventos na seção Entidades em uma página específica de Namespace de Hubs de Eventos. Para trabalhar com vários tópicos, você pode definir a função do IAM no nível dos Hubs de Eventos.

  • Um servidor EventHubs. Você pode encontrar isso na página de visão geral do seu namespace específico de Hubs de Eventos:

    Espaço de nomes dos Event Hubs

Além disso, para usar o Entra ID, precisamos dizer a Kafka para usar o mecanismo SASL OAuth (SASL é um protocolo genérico, e OAuth é um tipo de "mecanismo" SASL):

  • kafka.security.protocol deve ser SASL_SSL
  • kafka.sasl.mechanism deve ser OAUTHBEARER
  • kafka.sasl.login.callback.handler.class deve ser um nome totalmente qualificado da classe Java com um valor de para o manipulador de retorno de chamada de kafkashaded login de nossa classe Kafka sombreada. Veja o exemplo a seguir para a classe exata.

Exemplo

Em seguida, vejamos um exemplo em execução:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Tratamento de possíveis erros

  • As opções de streaming não são suportadas.

    Se você tentar usar esse mecanismo de autenticação em um pipeline Delta Live Tables configurado com o Unity Catalog, poderá receber o seguinte erro:

    Erro de streaming não suportado

    Para resolver esse erro, use uma configuração de computação suportada. Consulte Autenticação da entidade de serviço com o Microsoft Entra ID (anteriormente Azure Ative Directory) e Hubs de Eventos do Azure.

  • Falha ao criar um novo KafkaAdminClientarquivo .

    Este é um erro interno que Kafka lança se qualquer uma das seguintes opções de autenticação estiver incorreta:

    • ID do cliente (também conhecido como ID do aplicativo)
    • ID de Inquilino do
    • Servidor EventHubs

    Para resolver o erro, verifique se os valores estão corretos para essas opções.

    Além disso, poderá ver este erro se modificar as opções de configuração fornecidas por predefinição no exemplo (que lhe foi pedido para não modificar), como kafka.security.protocol.

  • Não há registros sendo devolvidos

    Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.

    Nenhuma mensagem de resultados

    Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Algumas razões possíveis (embora não exaustivas) são:

    • Você especificou o tópico EventHubs errado.
    • A opção de configuração padrão do Kafka para startingOffsets é latest, e você ainda não está recebendo nenhum dado através do tópico. Você pode configurar startingOffsetstoearliest para começar a ler dados a partir dos primeiros deslocamentos de Kafka.