Melhores práticas: Delta Lake

Este artigo descreve as melhores práticas ao usar o Delta Lake.

O Databricks recomenda usar a otimização preditiva. Consulte a Otimização preditiva para o Delta Lake.

Ao excluir e recriar uma tabela no mesmo local, você sempre deve usar uma instrução CREATE OR REPLACE TABLE. Consulte Soltar ou substituir uma tabela Delta.

Usar clustering líquido para ignorar dados otimizados

O Databricks recomenda o uso de clustering líquido em vez de particionamento, ordem Z ou outras estratégias de organização de dados para otimizar o layout de dados para ignorar dados. Confira Usar clustering líquido para tabelas Delta.

Arquivos compactos

A otimização preditiva executa automaticamente comandos OPTIMIZE e VACUUM em tabelas gerenciadas do Catálogo do Unity. Consulte a Otimização preditiva para o Delta Lake.

O Databricks recomenda a execução frequente do comando OPTIMIZE para compactar arquivos pequenos.

Observação

Essa operação não remove os arquivos antigos. Para removê-los, execute o comando VACUUM.

Substituir o conteúdo ou esquema de uma tabela

Às vezes, você pode querer substituir uma tabela do Delta. Por exemplo:

  • Você descobre que os dados na tabela estão incorretos e deseja substituir o conteúdo.
  • Você deseja regravar a tabela inteira para fazer alterações de esquema incompatíveis (como alterar os tipos de coluna).

Embora você possa excluir todo o diretório de uma tabela do Delta e criar uma nova tabela no mesmo caminho, não é recomendável porque:

  • Excluir um diretório não é eficiente. Um diretório que contém arquivos muito grandes pode levar horas ou até mesmo dias para ser excluído.
  • Você perde todo o conteúdo dos arquivos excluídos. Será difícil recuperar se você excluir a tabela errada.
  • A exclusão de diretório não é atômica. Enquanto você estiver excluindo a tabela, uma consulta simultânea que lê a tabela poderá falhar ou ver uma tabela parcial.

Se você não precisar alterar o esquema de tabela, poderá excluir dados de uma tabela do Delta e inserir seus novos dados ou atualizar a tabela para corrigir os valores incorretos.

Se você quiser alterar o esquema de tabela, poderá substituir a tabela inteira atomicamente. Por exemplo:

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Há vários benefícios dessa abordagem:

  • A substituição de uma tabela é muito mais rápida porque ela não precisa listar o diretório recursivamente ou excluir arquivos.
  • A versão antiga da tabela ainda existe. Se você excluir a tabela errada, poderá recuperar facilmente os dados antigos usando a viagem no tempo. Confira Trabalhar com o histórico de tabelas do Delta Lake.
  • É uma operação atômica. Consultas simultâneas ainda podem ler a tabela enquanto você está a exclui.
  • Devido às garantias de transação ACID do Delta Lake, se a substituição da tabela falhar, a tabela permanecerá em seu estado anterior.

Além disso, se você quiser excluir arquivos antigos para economizar custo de armazenamento após a substituição da tabela, poderá usar o VACUUM para excluí-los. Ele é otimizado para exclusão de arquivos e geralmente mais rápidaodo que excluir todo o diretório.

Cache do Spark

O Databricks não recomenda usar o cache do Spark pelos seguintes motivos:

  • Você perde todos os dados ignorados que podem vir de filtros adicionais adicionados sobre o armazenado em cache DataFrame.
  • Os dados armazenados em cache podem não ser atualizados se a tabela for acessada usando um identificador diferente.

Diferenças entre Delta Lake e Parquet no Apache Spark

O Delta Lake lida com as operações a seguir automaticamente. Você nunca deve executar essas operações manualmente:

  • REFRESH TABLE: as tabelas Delta sempre retornam as informações mais atualizadas. Portanto, não há necessidade de chamar REFRESH TABLE manualmente após as alterações.
  • Adicionar e remover partições: o Delta Lake acompanha automaticamente o conjunto de partições presente em uma tabela e atualiza a lista à medida que os dados são adicionados ou removidos. Como resultado, não há necessidade de executar ALTER TABLE [ADD|DROP] PARTITION nem MSCK.
  • Carregar uma única partição: não é necessário ler partições diretamente. Por exemplo, você não precisa executar spark.read.format("parquet").load("/data/date=2017-01-01"). Em vez disso, use uma cláusula WHERE para ignorar dados, como spark.read.table("<table-name>").where("date = '2017-01-01'").
  • Não altere manualmente os arquivos de dados: o Delta Lake usa o log de transações para confirmar alterações na tabela atomicamente. Não modifique, adicione ou exclua arquivos de dados do Parquet diretamente em uma tabela Delta, porque isso pode levar à perda de dados ou à corrupção da tabela.

Melhorar o desempenho da mesclagem do Delta Lake

Você pode reduzir o tempo necessário para mesclar usando as seguintes abordagens:

  • Reduzir o espaço de pesquisa para correspondências: por padrão, a operação merge pesquisa a tabela Delta inteira para localizar correspondências na tabela de origem. Uma maneira de acelerar merge é reduzir o espaço de pesquisa adicionando restrições conhecidas na condição de correspondência. Por exemplo, suponha que você tenha uma tabela particionada por country e date que deseja usar merge para atualizar as informações do último dia e de um país específico. Adicionar a seguinte condição torna a consulta mais rápida, pois ela procura correspondências apenas nas partições relevantes:

    events.date = current_date() AND events.country = 'USA'
    

    Além disso, essa consulta também reduz as chances de conflitos com outras operações simultâneas. Confira Níveis de isolamento e conflitos de gravação no Azure Databricks para obter mais detalhes.

  • Arquivos compactados: se os dados são armazenados em muitos arquivos pequenos, a leitura dos dados para pesquisar correspondências pode se tornar lenta. Você pode compactar arquivos pequenos em arquivos maiores para melhorar a taxa de transferência de leitura. Confira Compactar arquivos de dados com otimização no Delta Lake para obter detalhes.

  • Controlar as partições de ordem aleatória para gravações: a operação merge embaralha os dados várias vezes para computar e gravar os dados atualizados. O número de tarefas usadas para embaralhar é controlado pela configuração spark.sql.shuffle.partitions da sessão do Spark. Definir esse parâmetro não apenas controla o paralelismo, mas também determina o número de arquivos de saída. Aumentar o valor aumenta o paralelismo, mas também gera um número maior de arquivos de dados menores.

  • Habilitar gravações otimizadas: para tabelas particionadas, o merge pode produzir um número muito maior de arquivos pequenos do que o número de partições em ordem aleatória. Isso ocorre porque cada tarefa em ordem aleatória pode gravar vários arquivos em várias partições e pode se tornar um gargalo de desempenho. Você pode reduzir o número de arquivos habilitando gravações otimizadas. Consulte Gravações otimizadas para o Delta Lake no Azure Databricks.

  • Ajustar tamanhos de arquivo na tabela: o Azure Databricks pode detectar automaticamente se uma tabela Delta tem operações frequentes merge que reescrevem arquivos e pode optar por reduzir o tamanho dos arquivos reescritos em antecipação a novas reescritas de arquivo no futuro. Consulte a seção sobre como ajustar os tamanhos dos arquivos para obter detalhes.

  • Mesclagem em ordem aleatória baixa: A Mesclagem em ordem aleatória baixa fornece uma implementação otimizada do MERGE que fornece melhor desempenho para cargas de trabalho mais comuns. Além disso, ele preserva otimizações de layout de dados existentes, como ordenação Z em dados não modificados.

Gerenciar recência de dados

No início de cada consulta, as tabelas Delta são atualizadas automaticamente para a versão mais recente da tabela. Esse processo pode ser observado em notebooks quando o status do comando informa: Updating the Delta table's state. No entanto, ao executar a análise histórica em uma tabela, talvez você não precise necessariamente de dados recentes até o último minuto, especialmente em tabelas em que os dados de streaming estão sendo ingeridos com frequência. Nesses casos, as consultas podem ser executadas em instantâneos obsoletos da sua tabela Delta. Essa abordagem pode reduzir a latência na obtenção de resultados de consultas.

Você pode configurar a tolerância para dados obsoletos definindo a configuração spark.databricks.delta.stalenessLimit de sessão do Spark com um valor de cadeia de caracteres de tempo, como 1h ou 15m (por 1 hora ou 15 minutos, respectivamente). Essa configuração é específica da sessão e não afeta outros clientes que acessam a tabela. Se o estado da tabela tiver sido atualizado dentro do limite de desatualização, uma consulta em relação à tabela retornará os resultados sem aguardar a atualização da tabela mais recente. Essa configuração nunca impede a atualização da tabela e, quando dados obsoletos são retornados, a atualização é processada em segundo plano. Se a última atualização da tabela for mais antiga que o limite de desatualização, a consulta não retornará os resultados até que a atualização de estado da tabela seja concluída.

Pontos de verificação aprimorados para consultas de baixa latência

O Delta Lake grava pontos de verificação como um estado de agregação de uma tabela Delta com uma frequência otimizada. Esses pontos de verificação servem como ponto de partida para computar o estado mais recente da tabela. Sem pontos de verificação, o Delta Lake teria que ler uma grande coleção de arquivos JSON (arquivos “delta”) que representam confirmações no log de transação para calcular o estado de uma tabela. Além disso, as estatísticas no nível de coluna que o Delta Lake usa para ignorar dados são armazenadas no ponto de verificação.

Importante

Os pontos de verificação do Delta Lake são diferentes dos pontos de verificação de Streaming Estruturado.

As estatísticas em nível de coluna são armazenadas como um struct e um JSON (para compatibilidade com versões anteriores). O formato de struct faz o Delta Lake ler muito mais rapidamente, pois:

  • O Delta Lake não faz análise JSON cara para obter estatísticas no nível da coluna.
  • Os recursos de remoção de coluna do Parquet reduzem significativamente as E/S necessárias para ler as estatísticas de uma coluna.

O formato de struct habilita uma coleção de otimizações que reduzem a sobrecarga das operações de leitura do Delta Lake de segundos a dezenas de milissegundos, o que diminui significativamente a latência de consultas curtas.

Gerenciar estatísticas em nível de coluna nos pontos de verificação

Você gerencia como as estatísticas são gravadas nos pontos de verificação usando as propriedades de tabela delta.checkpoint.writeStatsAsJson e delta.checkpoint.writeStatsAsStruct. Se ambas as propriedades de tabela forem false, o Delta Lake não poderá ignorar os dados.

  • As gravações em lote gravam estatísticas no formato JSON e struct. delta.checkpoint.writeStatsAsJson é true.
  • delta.checkpoint.writeStatsAsStruct é indefinido por padrão.
  • Os leitores usam a coluna struct quando disponível; do contrário, retornam ao uso da coluna JSON.

Importante

Os pontos de verificação aprimorados não atrapalham a compatibilidade com leitores do Delta Lake de código aberto. No entanto, a configuração de delta.checkpoint.writeStatsAsJson como false pode ter implicações em leitores do Delta Lake proprietários. Entre em contato com seus fornecedores para saber mais sobre as implicações no desempenho.

Habilitar pontos de verificação avançados para consultas de Streaming Estruturado

Se as suas cargas de trabalho de Fluxo Estruturado não tiverem requisitos de baixa latência (latências com menos de um minuto), você poderá habilitar pontos de verificação aprimorados com a execução do seguinte comando SQL:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

Você também pode melhorar a latência de gravação do ponto de verificação definindo as seguintes propriedades da tabela:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Se os dados ignorados não forem úteis no seu aplicativo, você poderá definir ambas as propriedades como false. Então, nenhuma estatística será coletada ou gravada. O Databricks não recomenda essa configuração.