Partilhar via


Aplicar marcas d'água para controlar os limites de processamento de dados

Este artigo apresenta os conceitos básicos de marca d'água e fornece recomendações para o uso de marcas d'água em operações comuns de streaming com monitoração de estado. Você deve aplicar marcas d'água a operações de streaming com monitoração de estado para evitar expandir infinitamente a quantidade de dados mantidos no estado, o que poderia introduzir problemas de memória e aumentar as latências de processamento durante operações de streaming de longa duração.

O que é uma marca d'água?

O Streaming Estruturado usa marcas d'água para controlar o limite por quanto tempo continuar processando atualizações para uma determinada entidade de estado. Exemplos comuns de entidades estatais incluem:

  • Agregações ao longo de uma janela de tempo.
  • Chaves únicas em uma junção entre dois fluxos.

Ao declarar uma marca d'água, você especifica um campo de carimbo de data/hora e um limite de marca d'água em um DataFrame de streaming. À medida que novos dados chegam, o gerente de estado rastreia o carimbo de data/hora mais recente no campo especificado e processa todos os registros dentro do limite de atraso.

O exemplo a seguir aplica um limite de marca d'água de 10 minutos a uma contagem em janela:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Neste exemplo:

  • A event_time coluna é usada para definir uma marca d'água de 10 minutos e uma janela de queda de 5 minutos.
  • Uma contagem é coletada para cada id janela de 5 minutos não sobreposta.
  • As informações de estado são mantidas para cada contagem até que o final da janela seja 10 minutos mais antigo do que o último observado event_time.

Importante

Os limites de marca d'água garantem que os registros que chegam dentro do limite especificado sejam processados de acordo com a semântica da consulta definida. Os registros que chegam tardiamente fora do limite especificado ainda podem ser processados usando métricas de consulta, mas isso não é garantido.

Como as marcas d'água afetam o tempo de processamento e o rendimento?

As marcas d'água interagem com os modos de saída para controlar quando os dados são gravados no coletor. Como as marcas d'água reduzem a quantidade total de informações de estado a serem processadas, o uso eficaz de marcas d'água é essencial para uma taxa de transferência de streaming com monitoração de estado eficiente.

Nota

Nem todos os modos de saída são suportados para todas as operações com monitoração de estado.

Marcas d'água e modo de saída para agregações em janela

A tabela a seguir detalha o processamento de consultas com agregação em um carimbo de data/hora com uma marca d'água definida:

Modo de saída Comportamento
Acrescentar As linhas são gravadas na tabela de destino assim que o limite da marca d'água é ultrapassado. Todas as gravações são atrasadas com base no limite de atraso. O estado de agregação antigo é descartado assim que o limite é ultrapassado.
Atualizar As linhas são gravadas na tabela de destino à medida que os resultados são calculados e podem ser atualizadas e substituídas à medida que novos dados chegam. O estado de agregação antigo é descartado assim que o limite é ultrapassado.
Concluído O estado de agregação não é descartado. A tabela de destino é reescrita a cada gatilho.

Marcas d'água e saída para junções stream-stream

As junções entre vários fluxos suportam apenas o modo de acréscimo, e os registros correspondentes são gravados em cada lote em que são descobertos. Para junções internas, o Databricks recomenda definir um limite de marca d'água em cada fonte de dados de streaming. Isso permite que as informações de estado sejam descartadas para registros antigos. Sem marcas d'água, o Streaming Estruturado tenta unir todas as teclas de ambos os lados da junção a cada gatilho.

O Streaming Estruturado tem semântica especial para suportar junções externas. A marca d'água é obrigatória para junções externas, pois indica quando uma chave deve ser escrita com um valor nulo depois de ficar incomparável. Observe que, embora as junções externas possam ser úteis para registrar registros que nunca são correspondidos durante o processamento de dados, porque as junções só gravam em tabelas como operações de acréscimo, esses dados ausentes não são registrados até que o limite de atraso tenha passado.

Controle o limite de dados atrasados com várias políticas de marca d'água no Streaming estruturado

Ao trabalhar com várias entradas de Streaming Estruturado, você pode definir várias marcas d'água para controlar os limites de tolerância para dados que chegam tarde. A configuração de marcas d'água permite controlar as informações de estado e a latência de impactos.

Uma consulta de streaming pode ter vários fluxos de entrada que são unidos ou unidos. Cada um dos fluxos de entrada pode ter um limite diferente de dados atrasados que precisa ser tolerado para operações com monitoração de estado. Especifique esses limites usando withWatermarks("eventTime", delay) em cada um dos fluxos de entrada. Segue-se um exemplo de consulta com junções stream-stream.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Ao executar a consulta, o Streaming Estruturado rastreia individualmente o tempo máximo de evento visto em cada fluxo de entrada, calcula marcas d'água com base no atraso correspondente e escolhe uma única marca d'água global com elas para ser usada para operações com monitoração de estado. Por padrão, o mínimo é escolhido como marca d'água global porque garante que nenhum dado seja acidentalmente descartado como tarde demais se um dos fluxos ficar atrás dos outros (por exemplo, um dos fluxos parar de receber dados devido a falhas upstream). Em outras palavras, a marca d'água global se move com segurança no ritmo do fluxo mais lento e a saída da consulta é atrasada de acordo.

Se quiser obter resultados mais rápidos, você pode definir a política de marca d'água múltipla para escolher o valor máximo como marca d'água global definindo a configuração spark.sql.streaming.multipleWatermarkPolicy SQL como max (o padrão é min). Isso permite que a marca d'água global se mova no ritmo do fluxo mais rápido. No entanto, essa configuração descarta dados dos fluxos mais lentos. Por isso, a Databricks recomenda que você use essa configuração criteriosamente.

Soltar duplicatas dentro da marca d'água

No Databricks Runtime 13.3 LTS e superior, você pode desduplicar registros dentro de um limite de marca d'água usando um identificador exclusivo.

O Streaming Estruturado fornece garantias de processamento exatamente uma vez, mas não desduplica automaticamente os registros de fontes de dados. Você pode usar dropDuplicatesWithinWatermark para desduplicar registros em qualquer campo especificado, permitindo que você remova duplicados de um fluxo, mesmo que alguns campos sejam diferentes (como hora do evento ou hora de chegada).

É garantido que os registros duplicados que chegam dentro da marca d'água especificada serão descartados. Essa garantia é rigorosa em apenas uma direção, e registros duplicados que chegam fora do limite especificado também podem ser descartados. Você deve definir o limite de atraso da marca d'água maior que as diferenças máximas de carimbo de data/hora entre eventos duplicados para remover todas as duplicatas.

Você deve especificar uma marca d'água para usar o dropDuplicatesWithinWatermark método, como no exemplo a seguir:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")