Share via


Visão geral do Linux Foundation Delta Lake

Este artigo foi adaptado para mais clareza com base no equivalente original publicado aqui. Ele ajudará você a explorar rapidamente os principais recursos do Delta Lake. O artigo fornece snippets de código que mostram como fazer leituras e gravações em tabelas do Delta Lake com base em consultas interativas, em lote e de streaming. Os snippets de código também estão disponíveis em um conjunto de notebooks PySpark aqui, Scala aqui e C# aqui

Veja o que abordaremos:

  • Criar uma tabela
  • Ler dados
  • Atualizar os dados da tabela
  • Substituir os dados da tabela
  • Atualização condicional sem substituição
  • Ler versões mais antigas dos dados usando a viagem no tempo
  • Gravar um fluxo de dados em uma tabela
  • Ler um fluxo de alterações de uma tabela
  • Suporte a SQL

Configuração

Lembre-se de modificar o trecho abaixo conforme apropriado para o seu ambiente.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Resulta em:

'/delta/delta-table-335323'

Criar uma tabela

Para criar uma tabela do Delta Lake, grave um DataFrame fora do DataFrame no formato delta. Você pode alterar o formato de Parquet, CSV, JSON etc. para delta.

O código a seguir mostra como criar uma tabela do Delta Lake usando o esquema inferido do DataFrame.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Resulta em:

ID
0
1
2
3
4

Ler dados

Leia os dados da tabela do Delta Lake especificando o caminho para os arquivos e o formato delta.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Resulta em:

ID
1
3
4
0
2

A ordem dos resultados é diferente da mostrada acima, pois não havia nenhuma ordem explicitamente especificada antes da geração dos resultados.

Atualizar os dados da tabela

O Delta Lake oferece suporte a várias operações para modificar tabelas usando APIs DataFrame padrão. Essas operações são um dos aprimoramentos que o formato delta adiciona. O exemplo a seguir executa um trabalho em lotes para substituir os dados da tabela.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Resulta em:

ID
7
8
5
9
6

Aqui você pode ver que todos os cinco registros foram atualizados para conter novos valores.

Salvar os dados como tabelas de catálogo

O Delta Lake pode fazer gravações em tabelas de catálogos gerenciadas ou externas.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Resulta em:

Banco de Dados tableName isTemporary
default externaldeltatable false
default manageddeltatable false

Com esse código, você criou uma tabela no catálogo de um dataframe existente, chamada de tabela gerenciada. Em seguida, você definiu uma nova tabela externa no catálogo que usa uma localização existente, chamada de tabela externa. Na saída, você pode ver que ambas as tabelas, independentemente de como foram criadas, estão listadas no catálogo.

Agora você pode examinar as propriedades estendidas das duas tabelas

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Resulta em:

col_name data_type comentário
id BIGINT null
Informações detalhadas da tabela
Banco de dados default
Tabela manageddeltatable
Proprietário trusted-service-user
Hora de criação Sáb 25 de abr 00h35min34s UTC 2020
Último acesso Qui 1 de jan 00h00min00s UTC 1970
Criado por Spark 2.4.4.2.6.99.201-11401300
Tipo GERENCIADO
Provedor delta
Propriedades da tabela [transient_lastDdlTime=1587774934]
Estatísticas 2\.407 bytes
Localização abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<workspace name>/warehouse/manageddeltatable
Biblioteca Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Propriedades de armazenamento [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Resulta em:

col_name data_type comentário
id BIGINT null
Informações detalhadas da tabela
Banco de dados default
Tabela externaldeltatable
Proprietário trusted-service-user
Hora de criação Sáb 25 de abr 00h35min38s UTC 2020
Último acesso Qui 1 de jan 00h00min00s UTC 1970
Criado por Spark 2.4.4.2.6.99.201-11401300
Tipo EXTERNAL
Provedor DELTA
Propriedades da tabela [transient_lastDdlTime=1587774938]
Localização abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152
Biblioteca Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Propriedades de armazenamento [serialization.format=1]

Atualização condicional sem substituição

O Delta Lake fornece APIs programáticas para atualização condicional, exclusão e mesclagem (esse comando é comumente chamado de upsert) de dados em tabelas.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Resulta em:

ID
106
108
5
7
9

Aqui, você acabou de adicionar 100 a cada ID par.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Resulta em:

ID
5
7
9

Observe que todas as linhas pares foram excluídas.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Resulta em:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Aqui você tem uma combinação dos dados existentes. Os dados existentes receberam o valor -1 no caminho do código update(WhenMatched). Os dados que foram criados na parte superior do snippet e foram adicionados por meio do caminho do código de inserção (WhenNotMatched) também foram adicionados.

Histórico

O Delta Lake tem a capacidade de permitir a análise do histórico de uma tabela. Ou seja, as alterações que foram feitas na tabela delta subjacente. A célula abaixo mostra como é simples inspecionar o histórico.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Resulta em:

version timestamp userId userName operation operationParameters trabalho notebook clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 null null MESCLAR [predicado - > (oldData. ID = newData. ID )] nulo nulo null 3 null false
3 2020-04-25 00:36:08 null null Delete (excluir) [predicado -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] nulo nulo null 2 nulo false
2 2020-04-25 00:35:51 null null UPDATE [predicado -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] nulo nulo nulo 1 null false
1 2020-04-25 00:35:05 null null WRITE [mode -> Overwrite, partitionBy -> []] nulo nulo nulo 0 nulo false
0 2020-04-25 00:34:34 null null WRITE [mode -> ErrorIfExists, partitionBy -> []] nulo nulo nulo nulo null true

Aqui você pode ver todas as modificações feitas nos snippets de código acima.

Ler versões mais antigas dos dados usando a viagem no tempo

É possível consultar instantâneos anteriores da tabela do Delta Lake usando um recurso chamado viagem no tempo. Se você quiser acessar os dados substituídos, consulte um instantâneo da tabela antes da substituição do primeiro conjunto de dados usando a opção versionAsOf.

Depois de executar a célula abaixo, você verá o primeiro conjunto de dados anterior à substituição. Viagem no tempo é um recurso poderoso que aproveita o poder do log de transações do Delta Lake para acessar dados que não estão mais na tabela. A remoção da opção da versão 0 (ou a especificação da versão 1) permitirá que você veja os dados mais recentes novamente. Para obter mais informações, confira Consultar um instantâneo mais antigo de uma tabela.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Resulta em:

ID
0
1
4
3
2

Aqui você pode ver que voltou para a versão mais antiga dos dados.

Gravar um fluxo de dados em uma tabela

Você também pode fazer gravações em uma tabela do Delta Lake usando o Streaming Estruturado do Spark. O log de transações do Delta Lake garante o processamento exatamente uma vez, mesmo quando há outros fluxos ou consultas em lote em execução simultaneamente na tabela. Por padrão, os fluxos são executados no modo de acréscimo, que adiciona novos registros à tabela.

Para obter mais informações sobre a integração do Delta Lake ao Streaming Estruturado, confira Leituras e gravações em streaming de tabela.

Nas células abaixo, veja o que estamos fazendo:

  • A célula 30 mostra os dados recém-acrescentados
  • A célula 31 inspeciona o histórico
  • A célula 32 interrompe o trabalho de streaming estruturado
  • A célula 33 inspeciona o histórico <–Você observará que os acréscimos foram interrompidos

Primeiro, você configurará um trabalho simples do Spark Streaming para gerar uma sequência e fazer o trabalho gravar em sua Tabela Delta.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Ler um fluxo de alterações de uma tabela

Enquanto o fluxo está sendo gravado na tabela do Delta Lake, você também pode fazer uma leitura dessa tabela como uma fonte de streaming. Por exemplo, você pode iniciar outra consulta de streaming que imprime todas as alterações feitas na tabela do Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Resulta em:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Resulta em:

version timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 ATUALIZAÇÃO DE STREAMING [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MESCLAR [predicado - > (oldData. id = newData. id )] 3
3 2020-04-25 00:36:08 Delete (excluir) [predicado -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicado -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] nulo

Aqui você está removendo algumas das colunas menos interessantes para simplificar a experiência de exibição do histórico.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Resulta em:

version timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 ATUALIZAÇÃO DE STREAMING [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MESCLAR [predicado - > (oldData. id = newData. id )] 3
3 2020-04-25 00:36:08 Delete (excluir) [predicado -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicado -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] null

Converter Parquet em Delta

Faça uma conversão in-loco do formato Parquet em Delta.

Aqui você testará se a tabela existente está ou não no formato delta.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Resulta em:

Falso

Agora você vai converter os dados para o formato delta e verificar se funcionou.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Resulta em:

Verdadeiro

Suporte a SQL

O Delta dá suporte a comandos do utilitário de tabela por meio do SQL. Você pode usar o SQL para:

  • Obter o histórico da DeltaTable
  • Executar vácuo em uma DeltaTable
  • Converter um arquivo Parquet em Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Resulta em:

version timestamp userId userName operation operationParameters trabalho notebook clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 00:37:09 null null ATUALIZAÇÃO DE STREAMING [outputMode -> Ap... nulo nulo null 4 null true
4 2020-04-25 00:36:27 null null MESCLAR [predicado -> (ol... nulo nulo null 3 null false
3 2020-04-25 00:36:08 null null Delete (excluir) [predicado -> ["(... nulo null null 2 null false
2 2020-04-25 00:35:51 null null UPDATE [predicado -> ((i... nulo null nulo 1 null false
1 2020-04-25 00:35:05 null null WRITE [mode -> Overwrit... nulo nulo nulo 0 nulo false
0 2020-04-25 00:34:34 null null WRITE [mode -> ErrorIfE... nulo nulo nulo nulo null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Resulta em:

caminho
abfss://data@arca...

Agora, você vai verificar se uma tabela não é uma tabela de formato delta. Em seguida, você converterá a tabela em formato delta usando o SQL Spark e confirmará que ela foi convertida corretamente.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Resulta em:

Verdadeiro

Para obter a documentação completa, confira a página de documentação do Delta Lake

Para obter mais informações, confira Projeto do Delta Lake.

Próximas etapas