Execute sua primeira carga de trabalho de Streaming Estruturado

Este artigo fornece exemplos de código e explicação dos conceitos básicos necessários para executar suas primeiras consultas de Streaming Estruturado no Azure Databricks. Você pode usar o Streaming Estruturado para cargas de trabalho de processamento incrementais e quase em tempo real.

O Streaming Estruturado é uma das várias tecnologias que alimentam as mesas de streaming no Delta Live Tables. O Databricks recomenda o uso do Delta Live Tables para todas as novas cargas de trabalho de ETL, ingestão e Structured Streaming. Consulte O que é Delta Live Tables?.

Nota

Enquanto o Delta Live Tables fornece uma sintaxe ligeiramente modificada para declarar tabelas de streaming, a sintaxe geral para configurar leituras e transformações de streaming se aplica a todos os casos de uso de streaming no Azure Databricks. O Delta Live Tables também simplifica o streaming gerenciando informações de estado, metadados e várias configurações.

Ler a partir de um fluxo de dados

Você pode usar o Streaming Estruturado para ingerir dados incrementalmente de fontes de dados suportadas. Algumas das fontes de dados mais comuns usadas nas cargas de trabalho do Azure Databricks Structured Streaming incluem o seguinte:

  • Arquivos de dados no armazenamento de objetos na nuvem
  • Barramentos de mensagens e filas
  • Delta Lake

A Databricks recomenda o uso do Auto Loader para ingestão de streaming do armazenamento de objetos na nuvem. Auto Loader suporta a maioria dos formatos de arquivo suportados pelo Structured Streaming. Consulte O que é Auto Loader?.

Cada fonte de dados fornece várias opções para especificar como carregar lotes de dados. Durante a configuração do leitor, as principais opções que você pode precisar definir se enquadram nas seguintes categorias:

  • Opções que especificam a fonte de dados ou o formato (por exemplo, tipo de arquivo, delimitadores e esquema).
  • Opções que configuram o acesso aos sistemas de origem (por exemplo, configurações de porta e credenciais).
  • Opções que especificam por onde começar em um fluxo (por exemplo, deslocamentos Kafka ou leitura de todos os arquivos existentes).
  • Opções que controlam a quantidade de dados processados em cada lote (por exemplo, deslocamentos máximos, arquivos ou bytes por lote).

Use o Auto Loader para ler dados de streaming do armazenamento de objetos

O exemplo a seguir demonstra o carregamento de dados JSON com o Auto Loader, que é usado cloudFiles para indicar formato e opções. A schemaLocation opção permite a inferência e evolução do esquema. Cole o seguinte código em uma célula de bloco de anotações Databricks e execute a célula para criar um DataFrame de streaming chamado raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Como outras operações de leitura no Azure Databricks, configurar uma leitura de streaming não carrega dados. Você deve acionar uma ação nos dados antes que o fluxo comece.

Nota

Chamar display() um DataFrame de streaming inicia um trabalho de streaming. Para a maioria dos casos de uso de Streaming Estruturado, a ação que aciona um fluxo deve ser gravar dados em um coletor. Consulte Preparando seu código de streaming estruturado para produção.

Executar uma transformação de streaming

O Streaming Estruturado dá suporte à maioria das transformações disponíveis no Azure Databricks e no Spark SQL. Você pode até mesmo carregar modelos MLflow como UDFs e fazer previsões de streaming como uma transformação.

O exemplo de código a seguir conclui uma transformação simples para enriquecer os dados JSON ingeridos com informações adicionais usando funções Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

O resultado transformed_df contém instruções de consulta para carregar e transformar cada registro à medida que ele chega na fonte de dados.

Nota

O Streaming Estruturado trata as fontes de dados como conjuntos de dados ilimitados ou infinitos. Como tal, algumas transformações não são suportadas em cargas de trabalho de Streaming Estruturado porque exigiriam a classificação de um número infinito de itens.

A maioria das agregações e muitas junções exigem o gerenciamento de informações de estado com marcas d'água, janelas e modo de saída. Consulte Aplicar marcas d'água para controlar limites de processamento de dados.

Gravar em um coletor de dados

Um coletor de dados é o destino de uma operação de gravação de streaming. Os coletores comuns usados em cargas de trabalho de streaming do Azure Databricks incluem o seguinte:

  • Delta Lake
  • Barramentos de mensagens e filas
  • Bancos de dados de chave-valor

Assim como acontece com as fontes de dados, a maioria dos coletores de dados fornece várias opções para controlar como os dados são gravados no sistema de destino. Durante a configuração do gravador, as principais opções que você pode precisar definir se enquadram nas seguintes categorias:

  • Modo de saída (acrescentar por padrão).
  • Um local de ponto de verificação (obrigatório para cada gravador).
  • Intervalos de gatilho; consulte Configurar intervalos de gatilho de Streaming Estruturado.
  • Opções que especificam o coletor de dados ou formato (por exemplo, tipo de arquivo, delimitadores e esquema).
  • Opções que configuram o acesso aos sistemas de destino (por exemplo, configurações de porta e credenciais).

Executar uma gravação em lote incremental no Delta Lake

O exemplo a seguir grava no Delta Lake usando um caminho de arquivo especificado e um ponto de verificação.

Importante

Certifique-se sempre de especificar um local de ponto de verificação exclusivo para cada gravador de streaming que você configurar. O ponto de verificação fornece a identidade exclusiva para seu fluxo, rastreando todos os registros processados e informações de estado associadas à sua consulta de streaming.

A availableNow configuração para o gatilho instrui o Streaming Estruturado a processar todos os registros não processados anteriormente do conjunto de dados de origem e, em seguida, desligar, para que você possa executar com segurança o seguinte código sem se preocupar em deixar um fluxo em execução:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

Neste exemplo, nenhum novo registro chega em nossa fonte de dados, portanto, a execução repetida desse código não ingere novos registros.

Aviso

A execução do Streaming estruturado pode impedir que o encerramento automático desligue os recursos de computação. Para evitar custos inesperados, certifique-se de encerrar as consultas de streaming.

Preparando seu código de Streaming estruturado para produção

O Databricks recomenda o uso de Delta Live Tables para a maioria das cargas de trabalho de Streaming Estruturado. As recomendações a seguir fornecem um ponto de partida para preparar cargas de trabalho de Streaming estruturado para produção:

  • Remova código desnecessário de blocos de anotações que retornariam resultados, como display e count.
  • Não execute cargas de trabalho de Streaming Estruturado em clusters interativos; sempre agende fluxos como trabalhos.
  • Para ajudar a recuperar trabalhos de streaming automaticamente, configure trabalhos com infinitas tentativas.
  • Não use o dimensionamento automático para cargas de trabalho com o Structured Streaming.

Para obter mais recomendações, consulte Considerações de produção para streaming estruturado.

Leia dados do Delta Lake, transforme e grave no Delta Lake

A Delta Lake tem amplo suporte para trabalhar com Streaming Estruturado como fonte e coletor. Consulte Leituras e gravações de streaming de tabela Delta.

O exemplo a seguir mostra sintaxe de exemplo para carregar incrementalmente todos os novos registros de uma tabela Delta, juntá-los a um instantâneo de outra tabela Delta e gravá-los em uma tabela Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Você deve ter permissões adequadas configuradas para ler tabelas de origem e gravar em tabelas de destino e no local de ponto de verificação especificado. Preencha todos os parâmetros indicados com colchetes angulares (<>) usando os valores relevantes para suas fontes de dados e coletores.

Nota

Delta Live Tables fornece uma sintaxe totalmente declarativa para criar pipelines Delta Lake e gerencia propriedades como gatilhos e pontos de verificação automaticamente. Consulte O que é Delta Live Tables?.

Leia dados de Kafka, transforme e escreva em Kafka

O Apache Kafka e outros barramentos de mensagens fornecem algumas das menores latências disponíveis para grandes conjuntos de dados. Você pode usar o Azure Databricks para aplicar transformações aos dados ingeridos do Kafka e, em seguida, gravar dados de volta no Kafka.

Nota

A gravação de dados no armazenamento de objetos na nuvem adiciona uma sobrecarga de latência adicional. Se você deseja armazenar dados de um barramento de mensagens no Delta Lake, mas requer a menor latência possível para cargas de trabalho de streaming, o Databricks recomenda configurar trabalhos de streaming separados para ingerir dados para o lakehouse e aplicar transformações quase em tempo real para coletores de barramento de mensagens downstream.

O exemplo de código a seguir demonstra um padrão simples para enriquecer dados de Kafka juntando-os com dados em uma tabela Delta e, em seguida, gravando de volta em Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Você deve ter permissões adequadas configuradas para acessar seu serviço Kafka. Preencha todos os parâmetros indicados com colchetes angulares (<>) usando os valores relevantes para suas fontes de dados e coletores. Consulte Processamento de fluxo com Apache Kafka e Azure Databricks.