O streaming de tabela delta lê e grava

O Delta Lake está profundamente integrado com o Spark Structured Streaming através readStream e writeStream. O Delta Lake supera muitas das limitações tipicamente associadas a sistemas de transmissão em fluxo e ficheiros, incluindo:

  • Coalescência de pequenos arquivos produzidos por ingestão de baixa latência.
  • Manter o processamento "exatamente uma vez" com mais de um fluxo (ou trabalhos em lote simultâneos).
  • Descobrir com eficiência quais arquivos são novos ao usar arquivos como fonte para um fluxo.

Nota

Este artigo descreve o uso de tabelas Delta Lake como fontes e coletores de streaming. Para saber como carregar dados usando tabelas de streaming no Databricks SQL, consulte Carregar dados usando tabelas de streaming no Databricks SQL.

Tabela delta como fonte

O Streaming estruturado lê incrementalmente tabelas Delta. Enquanto uma consulta de streaming está ativa em relação a uma tabela Delta, novos registros são processados idempotentemente à medida que novas versões de tabela são confirmadas na tabela de origem.

Os exemplos de código a seguir mostram a configuração de uma leitura de streaming usando o nome da tabela ou o caminho do arquivo.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Importante

Se o esquema de uma tabela Delta for alterado após uma leitura de transmissão em fluxo começar na tabela, a consulta falhará. Para a maioria das alterações de esquema, pode reiniciar o fluxo para resolver o erro de correspondência do esquema e continuar o processamento.

No Databricks Runtime 12.2 LTS e inferior, não é possível transmitir de uma tabela Delta com mapeamento de colunas habilitado que tenha sofrido evolução de esquema não aditiva, como renomear ou soltar colunas. Para obter detalhes, veja Transmissão em fluxo com mapeamento de colunas e alterações de esquema.

Limitar a taxa de entrada

As seguintes opções estão disponíveis para controlar microlotes:

  • maxFilesPerTrigger: Quantos novos arquivos devem ser considerados em cada microlote. A predefinição é 1000.
  • maxBytesPerTrigger: Quantidade de dados processados em cada microlote. Esta opção define um "soft max", o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limite para fazer a consulta de streaming avançar nos casos em que a menor unidade de entrada é maior do que esse limite. Isso não é definido por padrão.

Se você usar maxBytesPerTrigger em conjunto com maxFilesPerTriggero , o microlote processará dados até que o maxFilesPerTrigger limite ou maxBytesPerTrigger seja atingido.

Nota

Nos casos em que as transações da tabela de origem são limpas devido à logRetentionDurationconfiguração e a consulta de streaming tenta processar essas versões, por padrão, a consulta não consegue evitar a perda de dados. Você pode definir a opção failOnDataLoss para false ignorar dados perdidos e continuar o processamento.

Transmitir um feed de captura de dados de alteração do Lago Delta (CDC)

O feed de dados de alteração do Delta Lake registra as alterações em uma tabela Delta, incluindo atualizações e exclusões. Quando habilitado, você pode transmitir de um feed de dados de alteração e escrever lógica para processar inserções, atualizações e exclusões em tabelas downstream. Embora a saída de dados do feed de dados de alteração difira ligeiramente da tabela Delta descrita, isso fornece uma solução para propagar alterações incrementais em tabelas downstream em uma arquitetura medalhão.

Importante

No Databricks Runtime 12.2 LTS e inferior, não é possível transmitir do feed de dados de alteração para uma tabela Delta com mapeamento de colunas habilitado que tenha sofrido evolução de esquema não aditiva, como renomear ou soltar colunas. Consulte Transmissão em fluxo com mapeamento de colunas e alterações de esquema.

Ignorar atualizações e exclusões

O Streaming Estruturado não manipula entradas que não sejam um acréscimo e lança uma exceção se ocorrerem modificações na tabela que está sendo usada como fonte. Existem duas estratégias principais para lidar com alterações que não podem ser propagadas automaticamente a jusante:

  • Você pode excluir a saída e o ponto de verificação e reiniciar o fluxo desde o início.
  • Você pode definir uma destas duas opções:
    • ignoreDeletes: ignore transações que excluem dados nos limites da partição.
    • skipChangeCommits: ignore transações que excluem ou modificam registros existentes. skipChangeCommits subsumes ignoreDeletes.

Nota

No Databricks Runtime 12.2 LTS e superior, skipChangeCommits substitui a configuração ignoreChangesanterior. No Databricks Runtime 11.3 LTS e inferior, ignoreChanges é a única opção suportada.

A semântica para ignoreChanges difere muito de skipChangeCommits. Com ignoreChanges os ficheiros de dados reescritos ativados na tabela de origem, são novamente emitidos após uma operação de alteração de dados, como UPDATE, MERGE INTO, DELETE (dentro de partições) ou OVERWRITE. Muitas vezes, as linhas inalteradas são emitidas juntamente com novas linhas, pelo que os consumidores a jusante têm de conseguir lidar com duplicados. As eliminações não são propagadas a jusante. ignoreChanges subsumes ignoreDeletes.

skipChangeCommits ignora totalmente as operações de alteração de ficheiros. Os ficheiros de dados que são reescritos na tabela de origem devido a uma operação de alteração de dados, como UPDATE, MERGE INTO, DELETEe OVERWRITE são ignorados na totalidade. Para refletir as alterações nas tabelas de origem a montante, tem de implementar lógicas separadas para propagar estas alterações.

As cargas de trabalho configuradas com ignoreChanges continuam a operar usando semântica conhecida, mas o Databricks recomenda o uso skipChangeCommits para todas as novas cargas de trabalho. A migração de cargas de trabalho usando ignoreChanges para skipChangeCommits requer lógica de refatoração.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events com date, user_emaile action colunas particionada por date. Você sai da user_events tabela e precisa excluir dados dela devido ao GDPR.

Quando você exclui nos limites da partição (ou seja, o WHERE está em uma coluna de partição), os arquivos já estão segmentados por valor, então a exclusão apenas descarta esses arquivos dos metadados. Ao excluir uma partição inteira de dados, você pode usar o seguinte:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

Se você excluir dados em várias partições (neste exemplo, filtrando em user_email), use a seguinte sintaxe:

spark.readStream.format("delta")
  .option("skipChangeCommits", "true")
  .load("/tmp/delta/user_events")

Se você atualizar a user_email com a UPDATE instrução, o arquivo que contém o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.

Especificar posição inicial

Você pode usar as opções a seguir para especificar o ponto inicial da fonte de streaming Delta Lake sem processar a tabela inteira.

  • startingVersion: A versão Delta Lake para começar. O Databricks recomenda omitir essa opção para a maioria das cargas de trabalho. Quando não está definido, o fluxo começa a partir da versão mais recente disponível, incluindo um instantâneo completo da tabela naquele momento.

    Se especificado, o fluxo lê todas as alterações na tabela Delta começando com a versão especificada (inclusive). Se a versão especificada não estiver mais disponível, o fluxo não será iniciado. Você pode obter as versões de confirmação na version coluna da saída do comando DESCRIBE HISTORY .

    Para retornar apenas as alterações mais recentes, especifique latest.

  • startingTimestamp: O carimbo de data/hora a partir do qual começar. Todas as alterações de tabela confirmadas no ou após o carimbo de data/hora (inclusive) são lidas pelo leitor de streaming. Se o carimbo de data/hora fornecido preceder todas as confirmações da tabela, a leitura de streaming começará com o carimbo de data/hora mais antigo disponível. Um dos seguintes:

    • Uma cadeia de caracteres de carimbo de data/hora. Por exemplo, "2019-01-01T00:00:00.000Z".
    • Uma cadeia de caracteres de data. Por exemplo, "2019-01-01".

Não é possível definir as duas opções ao mesmo tempo. Eles entram em vigor somente ao iniciar uma nova consulta de streaming. Se uma consulta de streaming tiver sido iniciada e o progresso tiver sido registrado em seu ponto de verificação, essas opções serão ignoradas.

Importante

Embora você possa iniciar a fonte de streaming a partir de uma versão especificada ou carimbo de data/hora, o esquema da fonte de streaming é sempre o esquema mais recente da tabela Delta. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão especificada ou carimbo de data/hora. Caso contrário, a fonte de streaming pode retornar resultados incorretos ao ler os dados com um esquema incorreto.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events. Se você quiser ler as alterações desde a versão 5, use:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

Se quiser ler as alterações desde 2018-10-18, utilize:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

Processar snapshot inicial sem que os dados sejam descartados

Nota

Esse recurso está disponível no Databricks Runtime 11.3 LTS e superior. Esta funcionalidade está em Pré-visualização Pública.

Ao usar uma tabela Delta como fonte de fluxo, a consulta primeiro processa todos os dados presentes na tabela. A tabela Delta nesta versão é chamada de instantâneo inicial. Por padrão, os arquivos de dados da tabela Delta são processados com base em qual arquivo foi modificado pela última vez. No entanto, a hora da última modificação não representa necessariamente a ordem de tempo do evento de registro.

Em uma consulta de streaming com monitoração de estado com uma marca d'água definida, o processamento de arquivos por tempo de modificação pode resultar em registros sendo processados na ordem errada. Isso pode levar a que os registos caiam como eventos tardios pela marca de água.

Você pode evitar o problema de queda de dados ativando a seguinte opção:

  • withEventTimeOrder: Se o instantâneo inicial deve ser processado com a ordem de hora do evento.

Com a ordem de tempo do evento habilitada, o intervalo de tempo do evento dos dados iniciais do instantâneo é dividido em intervalos de tempo. Cada microlote processa um bucket filtrando dados dentro do intervalo de tempo. As opções de configuração maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho do microlote, mas apenas de forma aproximada devido à natureza do processamento.

O gráfico abaixo mostra esse processo:

Instantâneo inicial

Informações notáveis sobre este recurso:

  • O problema de queda de dados só acontece quando o instantâneo Delta inicial de uma consulta de streaming com monitoração de estado é processado na ordem padrão.
  • Não é possível alterar withEventTimeOrder depois que a consulta de fluxo é iniciada enquanto o instantâneo inicial ainda está sendo processado. Para reiniciar com withEventTimeOrder alterado, você precisa excluir o ponto de verificação.
  • Se você estiver executando uma consulta de fluxo com withEventTimeOrder habilitado, não poderá fazer o downgrade para uma versão DBR que não ofereça suporte a esse recurso até que o processamento inicial do snapshot seja concluído. Se precisar fazer downgrade, aguarde a conclusão do snapshot inicial ou exclua o ponto de verificação e reinicie a consulta.
  • Este recurso não é suportado nos seguintes cenários incomuns:
    • A coluna de tempo do evento é uma coluna gerada e há transformações sem projeção entre a fonte Delta e a marca d'água.
    • Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.
  • Com a ordem de tempo do evento ativada, o desempenho do processamento inicial do snapshot Delta pode ser mais lento.
  • Cada microlote verifica o instantâneo inicial para filtrar dados dentro do intervalo de tempo de evento correspondente. Para uma ação de filtro mais rápida, é aconselhável usar uma coluna de origem Delta como a hora do evento para que o salto de dados possa ser aplicado (marque Pulo de dados para Delta Lake para saber quando for aplicável). Além disso, o particionamento de tabelas ao longo da coluna de tempo do evento pode acelerar ainda mais o processamento. Você pode verificar a interface do usuário do Spark para ver quantos arquivos delta são verificados para um microlote específico.

Exemplo

Suponha que você tenha uma tabela user_events com uma event_time coluna. Sua consulta de streaming é uma consulta de agregação. Se quiser garantir que não haja queda de dados durante o processamento inicial do snapshot, você pode usar:

spark.readStream.format("delta")
  .option("withEventTimeOrder", "true")
  .load("/tmp/delta/user_events")
  .withWatermark("event_time", "10 seconds")

Nota

Você também pode habilitar isso com a configuração do Spark no cluster, que se aplicará a todas as consultas de streaming: spark.databricks.delta.withEventTimeOrder.enabled true

Mesa delta como lavatório

Você também pode gravar dados em uma tabela Delta usando o Structured Streaming. O log de transações permite que o Delta Lake garanta exatamente um processamento, mesmo quando há outros fluxos ou consultas em lote sendo executados simultaneamente na tabela.

Nota

A função Delta Lake VACUUM remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretórios como <table-name>/_checkpoints.

Métricas

Você pode descobrir o número de bytes e o número de arquivos ainda a serem processados em um processo de consulta de streaming como as numBytesOutstanding métricas e numFilesOutstanding . As métricas adicionais incluem:

  • numNewListedFiles: Número de arquivos Delta Lake que foram listados para calcular a lista de pendências para este lote.
    • backlogEndOffset: A versão da tabela usada para calcular a lista de pendências.

Se você estiver executando o fluxo em um bloco de anotações, poderá ver essas métricas na guia Dados brutos no painel de progresso da consulta de streaming:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Modo de acréscimo

Por padrão, os fluxos são executados no modo de acréscimo, que adiciona novos registros à tabela.

Você pode usar o método path:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/_checkpoints/")
   .start("/delta/events")
)

Scala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

ou o toTable método, como se segue:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Modo completo

Você também pode usar o Streaming estruturado para substituir a tabela inteira por cada lote. Um exemplo de caso de uso é calcular um resumo usando agregação:

Python

(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)

Scala

spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

O exemplo anterior atualiza continuamente uma tabela que contém o número agregado de eventos por cliente.

Para aplicativos com requisitos de latência mais brandos, você pode economizar recursos de computação com gatilhos únicos. Use-os para atualizar tabelas de agregação resumidas em um determinado cronograma, processando apenas novos dados que chegaram desde a última atualização.

Executando junções estáticas de fluxo

Você pode confiar nas garantias transacionais e no protocolo de versionamento do Delta Lake para executar junções estáticas de fluxo. Uma junção estática de fluxo une a versão válida mais recente de uma tabela Delta (os dados estáticos) a um fluxo de dados usando uma junção sem estado.

Quando o Azure Databricks processa um microlote de dados em uma junção estática de fluxo, a versão válida mais recente dos dados da tabela Delta estática se une aos registros presentes no microlote atual. Como a junção é sem monitoração de estado, você não precisa configurar a marca d'água e pode processar resultados com baixa latência. Os dados na tabela Delta estática usada na junção devem ser lentamente alterados.

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Upsert de consultas de streaming usando foreachBatch

Você pode usar uma combinação de merge e foreachBatch para escrever upserts complexos de uma consulta de streaming em uma tabela Delta. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.

Esse padrão tem muitas aplicações, incluindo as seguintes:

  • Escrever agregados de streaming no Modo de Atualização: Isso é muito mais eficiente do que o Modo Completo.
  • Gravar um fluxo de alterações de banco de dados em uma tabela Delta: a consulta de mesclagem para gravar dados de alteração pode ser usada para foreachBatch aplicar continuamente um fluxo de alterações a uma tabela Delta.
  • Gravar um fluxo de dados na tabela Delta com desduplicação: a consulta de mesclagem somente inserção para desduplicação pode ser usada para gravar dados continuamente (com duplicatas) em foreachBatch uma tabela Delta com desduplicação automática.

Nota

  • Certifique-se de que sua merge instrução dentro foreachBatch é idempotente, pois as reinicializações da consulta de streaming podem aplicar a operação no mesmo lote de dados várias vezes.
  • Quando merge é usado no foreachBatch, a taxa de dados de entrada da consulta de streaming (relatada e StreamingQueryProgress visível no gráfico de taxa do notebook) pode ser relatada como um múltiplo da taxa real na qual os dados são gerados na fonte. Isto acontece porque merge lê os dados de entrada várias vezes, fazendo com que as métricas de entrada sejam multiplicadas. Se for um estrangulamento, pode colocar o DataFrame em cache antes merge e, em seguida, retirá-lo da cache depois de merge.

O exemplo a seguir demonstra como você pode usar o SQL dentro foreachBatch para realizar essa tarefa:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Você também pode optar por usar as APIs do Delta Lake para executar upserts de streaming, como no exemplo a seguir:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Tabela idempotente escreve em foreachBatch

Nota

O Databricks recomenda configurar uma gravação de streaming separada para cada coletor que você deseja atualizar. O uso foreachBatch para gravar em várias tabelas serializa gravações, o que reduz o paralelizaiton e aumenta a latência geral.

As tabelas delta suportam as seguintes DataFrameWriter opções para fazer gravações em várias tabelas dentro foreachBatch do idempotent:

  • txnAppId: Uma cadeia de caracteres exclusiva que você pode passar em cada gravação DataFrame. Por exemplo, você pode usar a ID StreamingQuery como txnAppId.
  • txnVersion: Um número monotonicamente crescente que atua como versão de transação.

Delta Lake usa a combinação de txnAppId e txnVersion para identificar gravações duplicadas e ignorá-las.

Se uma gravação em lote for interrompida com uma falha, a nova execução do lote usará o mesmo aplicativo e ID de lote para ajudar o tempo de execução a identificar corretamente gravações duplicadas e ignorá-las. O ID do aplicativo (txnAppId) pode ser qualquer cadeia de caracteres exclusiva gerada pelo usuário e não precisa estar relacionado ao ID do fluxo. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.

Aviso

Se você excluir o ponto de verificação de streaming e reiniciar a consulta com um novo ponto de verificação, deverá fornecer um txnAppIdarquivo . Novos pontos de verificação começam com um ID de lote de 0. O Delta Lake usa o ID do lote como txnAppId uma chave exclusiva e ignora lotes com valores já vistos.

O exemplo de código a seguir demonstra esse padrão:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}