Share via


Referência da linguagem Python de Delta Live Tables

Este artigo fornece detalhes para a interface de programação Delta Live Tables Python.

Para saber mais sobre a API do SQL, confira a Referência da linguagem SQL de Delta Live Tables.

Para obter detalhes específicos sobre a configuração do carregador automático, consulte O que é o carregador automático?.

Limitações

A interface do Python do Delta Live Tables tem as seguintes limitações:

  • As funções table e view de Python devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Como as transformações de DataFrame são executadas depois que o grafo de fluxo de dados completo é resolvido, o uso dessas operações pode ter efeitos colaterais não intencionais. Essas operações incluem funções como collect(), count(), toPandas(), save() e saveAsTable(). No entanto, você pode incluir essas funções fora das definições de função table ou view porque esse código é executado uma vez durante a fase de inicialização do grafo.
  • Não há suporte para a função pivot(). A operação pivot no Spark requer o carregamento adiantado de dados de entrada para calcular o esquema da saída. Não há suporte para essa funcionalidade no Delta Live Tables.

Importar o módulo Python dlt

As funções de Delta Live Tables do Python são definidas no módulo dlt. Seus pipelines implementados com a API do Python devem importar esse módulo:

import dlt

Criar uma exibição materializada Delta Live Tables ou tabela de streaming

No Python, Delta Live Tables determina se um conjunto de dados deve ser atualizado como uma exibição materializada ou uma tabela de streaming com base na consulta definidora. O decorador @table é usado para definir tanto as vistas materializadas quanto as tabelas de streaming.

Para definir uma exibição materializada em Python, aplique @table a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de streaming, aplique @table a uma consulta que executa uma leitura de streaming em uma fonte de dados. Ambos os tipos de conjunto de dados têm a mesma especificação de sintaxe da seguinte maneira:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Criar um modo de exibição Delta Live Tables

Para definir uma exibição em Python, aplique o decorador @view. Como o decorador @table, você pode usar exibições em Delta Live Tables para conjuntos de dados estáticos ou de streaming. A seguir está a sintaxe para definir modos de exibição com Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exemplo: definir tabelas e exibições

Para definir uma tabela ou exibição em Python, aplique o decorador @dlt.view ou @dlt.table a uma função. Você pode usar o nome da função ou o parâmetro name para atribuir o nome da tabela ou da exibição. O exemplo a seguir define dois conjuntos de dados diferentes: uma exibição chamada taxi_raw que usa um arquivo JSON como a fonte de entrada e uma tabela chamada filtered_data que usa a exibição taxi_raw como entrada:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Exemplo: acessar um conjunto de dados definido no mesmo pipeline

Além da leitura de fontes de dados externas, você pode acessar conjuntos de dados definidos no mesmo pipeline com a função Delta Live Tables read(). O exemplo a seguir demonstra a criação de um conjunto de dados customers_filtered usando a função read():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Você também pode usar a função spark.table() para acessar um conjunto de dados definido no mesmo pipeline. Ao usar a função spark.table() para acessar um conjunto de dados definido no pipeline, inclua no início do argumento da função a palavra-chave LIVE ao nome do conjunto de dados:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Exemplo: ler de uma tabela registrada em um metastore

Para ler dados de uma tabela registrada no metastore do Hive, no argumento da função omita a palavra-chave LIVE e, opcionalmente, qualifique o nome da tabela com o nome do banco de dados:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Para obter um exemplo de leitura de uma tabela do Catálogo Unity, consulte Ingerir dados em um pipeline do Catálogo Unity.

Exemplo: acessar um conjunto de dados usando spark.sql

Também é possível retornar um conjunto de dados usando uma expressão spark.sql em uma função de consulta. Para ler um conjunto de um conjunto de dados interno, inclua LIVE. no início do nome do conjunto de dados:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Crie uma tabela para usar como destino de operações de streaming

Use a função create_streaming_table() para criar uma tabela de destino para a saída de registros por operações de streaming, incluindo registros de saída apply_changes() e @append_flow.

Observação

As funções create_target_table() e create_streaming_live_table() foram preteridas. O Databricks recomenda atualizar o código existente para usar a função create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argumentos
name

Digite: str

O nome da tabela.

Este parâmetro é obrigatório.
comment

Digite: str

Uma descrição opcional para a tabela.
spark_conf

Digite: dict

Uma lista opcional de configurações do Spark para a execução dessa consulta.
table_properties

Digite: dict

Uma lista opcional de propriedades da tabela para a tabela.
partition_cols

Digite: array

Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela.
path

Digite: str

Um local de armazenamento opcional para os dados da tabela. Se ele não estiver definido, o sistema usará como padrão o local de armazenamento do pipeline.
schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL de SQL ou com um Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Digite: dict

Restrições opcionais de qualidade de dados para a tabela. Confira Várias expectativas.

Controlar como as tabelas são materializadas

As tabelas também oferecem controle adicional da materialização:

Observação

Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que o Delta Live Tables controle a organização de dados. A menos que espere que sua tabela cresça além de um terabyte, geralmente não é necessário especificar colunas de partição.

Exemplo: especificar um esquema e colunas de partição

Opcionalmente, você pode especificar um esquema de tabela usando um Python StructType ou uma cadeia de caracteres DDL de SQL. Quando especificada com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.

Os exemplos a seguir criam uma tabela chamada sales com um esquema especificado usando um StructType do Python:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

O exemplo a seguir especifica o esquema de uma tabela usando uma cadeia de caracteres DDL, define uma coluna gerada e define uma coluna de partição:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Por padrão, Delta Live Tables inferem o esquema da definição table, se você não especificar um esquema.

Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem

Observação

  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option(). Você não pode usar esse sinalizador em uma função dlt.read_stream().
  • Não é possível usar o sinalizador skipChangeCommits quando a tabela de streaming de origem está definida como destino de uma função apply_changes().

Por padrão, as tabelas de streaming exigem fontes somente acréscimo. Quando uma tabela de streaming usa outra tabela de streaming como origem, e a tabela de streaming de origem exige atualizações ou exclusões, por exemplo, o processamento do "direito de ser esquecido" do GDPR, o sinalizador skipChangeCommits pode ser definido durante a leitura da tabela de streaming de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Propriedades do Python Delta Live Tables

As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com o Delta Live Tables:

@table ou @view
name

Digite: str

Um nome opcional para a tabela ou exibição. Se não estiver definido, o nome da função será usado como o nome da tabela ou da exibição.
comment

Digite: str

Uma descrição opcional para a tabela.
spark_conf

Digite: dict

Uma lista opcional de configurações do Spark para a execução dessa consulta.
table_properties

Digite: dict

Uma lista opcional de propriedades da tabela para a tabela.
path

Digite: str

Um local de armazenamento opcional para os dados da tabela. Se ele não estiver definido, o sistema usará como padrão o local de armazenamento do pipeline.
partition_cols

Digite: a collection of str

Uma coleção opcional, por exemplo, um list, de uma ou mais colunas a serem usadas para particionar a tabela.
schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL de SQL ou com um Python
StructType.
temporary

Digite: bool

Criar uma tabela, mas não publicar os metadados da tabela. A palavra-chave temporary instrui as Tabelas Dinâmicas Delta a criar uma tabela que fique disponível para o pipeline, mas não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária permanece por toda a duração do pipeline que a cria e não apenas para uma única atualização.

O padrão é "False".
Definição de tabela ou exibição
def <function-name>()

Uma função Python que define o conjunto de dados. Se o parâmetro name não for definido, <function-name> será usado como o nome do conjunto de dados de destino.
query

Uma instrução SQL do Spark que retorna um conjunto de dados Spark ou um DataFrame do Koalas.

Use dlt.read() ou spark.table() para executar uma leitura completa de um conjunto de dados definido no mesmo pipeline. Ao usar a função spark.table() para ler de um conjunto de dados definido no mesmo pipeline, inclua no início do argumento da função a palavra-chave LIVE ao nome do conjunto de dados. Por exemplo, para ler de um conjunto de dados chamado customers:

spark.table("LIVE.customers")

Você também pode usar a função spark.table() para ler de uma tabela registrada no metastore omitindo a palavra-chave LIVE e, opcionalmente, qualificando o nome da tabela com o nome do banco de dados:

spark.table("sales.customers")

Use dlt.read_stream() para executar uma leitura de streaming de um conjunto de dados definido no mesmo pipeline.

Use a função spark.sql para definir uma consulta SQL para criar o conjunto de dados de retorno.

Use a sintaxe PySpark para definir consultas de Delta Live Tables com Python.
Expectativas
@expect("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, inclua a linha no conjunto de dados de destino.
@expect_or_drop("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, exclua a linha do conjunto de dados de destino.
@expect_or_fail("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, pare imediatamente a execução.
@expect_all(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, inclua a linha no conjunto de dados de destino.
@expect_all_or_drop(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar alguma expectativa, exclua a linha do conjunto de dados de destino.
@expect_all_or_fail(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar alguma expectativa, pare imediatamente a execução.

Alterar a captura de dados com Python em tabelas dinâmicas Delta

Use a função apply_changes() na API do Python para usar a funcionalidade de CDC do Delta Live Tables. A interface Delta Live Tables Python também fornece a função create_streaming_table(). Você pode usar essa função para criar a tabela de destino exigida pela função apply_changes().

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Observação

O comportamento padrão para eventos INSERT eUPDATE é upsert eventos de CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam às chaves especificadas ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de eventos DELETE pode ser especificada com a condição APPLY AS DELETE WHEN.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela apply_changes de destino, você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by.

Confira API APPLY CHANGES: Simplifique a captura de dados de alterações nas Tabelas Dinâmicas Delta.

Argumentos
target

Digite: str

O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função apply_changes().

Este parâmetro é necessário.
source

Digite: str

A fonte de dados que contém registros de CDC.

Este parâmetro é obrigatório.
keys

Digite: list

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

* Uma lista de cadeias de caracteres: ["userId", "orderId"]
* Uma lista de funções SQL col() do Spark: [col("userId"), col("orderId"]

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é obrigatório.
sequence_by

Tipo: str ou col()

O nome da coluna que especifica a ordem lógica dos eventos de CDC nos dados de origem. As tabelas Delta ao vivo usam esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.

Você pode especificar:

* Uma cadeia de caracteres: "sequenceNum"
* Uma função de SQL col() do Spark: col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é obrigatório.
ignore_null_updates

Digite: bool

Permitir a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates for True, as colunas com um null irão reter seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates for False, os valores existentes serão substituídos por valores null.

Esse parâmetro é opcional.

O padrão é False.
apply_as_deletes

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um Upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma marca para exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas marcas de exclusão. O intervalo de retenção pode ser configurado com o
pipelines.cdc.tombstoneGCThresholdInSecondspropriedade de tabela.

Você pode especificar:

* Uma cadeia de caracteres: "Operation = 'DELETE'"
* Uma função de SQL expr() do Spark: expr("Operation = 'DELETE'")

Esse parâmetro é opcional.
apply_as_truncates

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATE completa. Como essa cláusula dispara um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

O parâmetro apply_as_truncates tem suporte apenas para SCD tipo 1. O SCD tipo 2 não dá suporte a truncado.

Você pode especificar:

* Uma cadeia de caracteres: "Operation = 'TRUNCATE'"
* Uma função de SQL expr() do Spark: expr("Operation = 'TRUNCATE'")

Esse parâmetro é opcional.
column_list

except_column_list

Digite: list

Um subconjunto de colunas a ser incluído na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. É possível declarar um valor como uma lista de cadeias de caracteres ou como funções de SQL col() do Spark:

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Esse parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum column_list ou argumento except_column_list for passado para a função.
stored_as_scd_type

Tipo: str ou int

Se é necessário armazenar registros como SCD tipo 1 ou SCD tipo 2.

Defina como 1 para o SCD tipo 1 ou como 2 para o SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.
track_history_column_list

track_history_except_column_list

Digite: list

Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Uso
track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. É possível declarar um valor como uma lista de cadeias de caracteres ou como funções de SQL col() do Spark: – track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Esse parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando os argumentos track_history_column_list
e track_history_except_column_list não são passados para a função.