Informazioni di riferimento sul linguaggio Python per tabelle live Delta
Questo articolo fornisce informazioni dettagliate sull'interfaccia di programmazione Python di Tabelle live Delta.
Per informazioni sull'API SQL, vedere le informazioni di riferimento sul linguaggio SQL per le tabelle live delta.
Per informazioni dettagliate sulla configurazione del caricatore automatico, vedere Che cos'è il caricatore automatico?.
Limitazioni
L'interfaccia Python delle tabelle live Delta presenta le limitazioni seguenti:
- Le funzioni e
view
Pythontable
devono restituire un dataframe. Alcune funzioni che operano su dataframe non restituiscono dataframe e non devono essere usate. Poiché le trasformazioni del dataframe vengono eseguite dopo la risoluzione del grafico completo del flusso di dati, l'uso di tali operazioni potrebbe avere effetti collaterali imprevisti. Queste operazioni includono funzioni comecollect()
,count()
,toPandas()
,save()
esaveAsTable()
. Tuttavia, è possibile includere queste funzioni al di fuori delle definizioni ditable
funzione oview
perché questo codice viene eseguito una sola volta durante la fase di inizializzazione del grafo. - La
pivot()
funzione non è supportata. L'operazionepivot
in Spark richiede il caricamento eager dei dati di input per calcolare lo schema dell'output. Questa funzionalità non è supportata nelle tabelle Live Delta.
Importare il dlt
modulo Python
Le funzioni Python delle tabelle live Delta sono definite nel dlt
modulo. Le pipeline implementate con l'API Python devono importare questo modulo:
import dlt
Creare una vista materializzata o una tabella di streaming di tabelle live Delta
In Python, Le tabelle Live Delta determinano se aggiornare un set di dati come vista materializzata o tabella di streaming in base alla query di definizione. L'elemento @table
Decorator viene usato per definire sia le viste materializzate che le tabelle di streaming.
Per definire una vista materializzata in Python, applicare @table
a una query che esegue una lettura statica su un'origine dati. Per definire una tabella di streaming, applicare @table
a una query che esegue un flusso letto su un'origine dati. Entrambi i tipi di set di dati hanno la stessa specifica di sintassi come indicato di seguito:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Creare una vista Tabelle live Delta
Per definire una visualizzazione in Python, applicare l'elemento @view
Decorator. Analogamente all'elemento @table
Decorator, è possibile usare le viste nelle tabelle live Delta per set di dati statici o di streaming. Di seguito è riportata la sintassi per la definizione delle viste con Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Esempio: Definire tabelle e viste
Per definire una tabella o una vista in Python, applicare l'elemento @dlt.view
Decorator o @dlt.table
a una funzione. È possibile usare il nome della funzione o il name
parametro per assegnare il nome della tabella o della vista. L'esempio seguente definisce due set di dati diversi: una vista denominata taxi_raw
che accetta un file JSON come origine di input e una tabella denominata filtered_data
che accetta la taxi_raw
vista come input:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Esempio: Accedere a un set di dati definito nella stessa pipeline
Oltre a leggere da origini dati esterne, è possibile accedere ai set di dati definiti nella stessa pipeline con la funzione Tabelle read()
live Delta. L'esempio seguente illustra la creazione di un customers_filtered
set di dati usando la read()
funzione :
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
È anche possibile usare la spark.table()
funzione per accedere a un set di dati definito nella stessa pipeline. Quando si usa la spark.table()
funzione per accedere a un set di dati definito nella pipeline, nell'argomento della funzione viene anteponeta la LIVE
parola chiave al nome del set di dati:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Esempio: Leggere da una tabella registrata in un metastore
Per leggere i dati da una tabella registrata nel metastore Hive, nell'argomento della funzione omettere la LIVE
parola chiave e, facoltativamente, qualificare il nome della tabella con il nome del database:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Per un esempio di lettura da una tabella di Catalogo Unity, vedere Inserire dati in una pipeline di Catalogo Unity.
Esempio: Accedere a un set di dati usando spark.sql
È anche possibile restituire un set di dati usando un'espressione spark.sql
in una funzione di query. Per leggere da un set di dati interno, anteporre LIVE.
al nome del set di dati:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Creare una tabella da usare come destinazione delle operazioni di streaming
Usare la create_streaming_table()
funzione per creare una tabella di destinazione per i record restituiti dalle operazioni di streaming, inclusi apply_changes() e @append_flow record di output.
Nota
Le create_target_table()
funzioni e create_streaming_live_table()
sono deprecate. Databricks consiglia di aggiornare il codice esistente per usare la create_streaming_table()
funzione .
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argomenti |
---|
name Tipo: str Il nome della tabella. Questo parametro è obbligatorio. |
comment Tipo: str Descrizione facoltativa per la tabella. |
spark_conf Tipo: dict Elenco facoltativo delle configurazioni di Spark per l'esecuzione di questa query. |
table_properties Tipo: dict Elenco facoltativo di proprietà della tabella per la tabella. |
partition_cols Tipo: array Elenco facoltativo di una o più colonne da utilizzare per il partizionamento della tabella. |
path Tipo: str Percorso di archiviazione facoltativo per i dati della tabella. Se non è impostato, per impostazione predefinita il sistema verrà impostato sul percorso di archiviazione della pipeline. |
schema Tipo: str o StructType Definizione dello schema facoltativa per la tabella. Gli schemi possono essere definiti come stringa DDL SQL o con python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Tipo: dict Vincoli di qualità dei dati facoltativi per la tabella. Vedere più aspettative. |
Controllare la modalità di materializzazione delle tabelle
Le tabelle offrono anche un controllo aggiuntivo della materializzazione:
- Specificare la modalità di partizionamento delle tabelle tramite
partition_cols
. È possibile usare il partizionamento per velocizzare le query. - È possibile impostare le proprietà della tabella quando si definisce una vista o una tabella. Vedere Proprietà della tabella Tabelle live Delta.
- Impostare un percorso di archiviazione per i dati della tabella usando l'impostazione
path
. Per impostazione predefinita, i dati della tabella vengono archiviati nel percorso di archiviazione della pipeline, sepath
non è impostato. - È possibile usare colonne generate nella definizione dello schema. Vedere Esempio: Specificare uno schema e colonne di partizione.
Nota
Per le tabelle di dimensioni inferiori a 1 TB, Databricks consiglia di consentire alle tabelle live delta di controllare l'organizzazione dei dati. A meno che la tabella non si cresca oltre un terabyte, in genere non è consigliabile specificare le colonne di partizione.
Esempio: Specificare uno schema e colonne di partizione
Facoltativamente, è possibile specificare uno schema di tabella usando python StructType
o una stringa DDL SQL. Se specificato con una stringa DDL, la definizione può includere colonne generate.
L'esempio seguente crea una tabella denominata sales
con uno schema specificato usando un python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Nell'esempio seguente viene specificato lo schema per una tabella usando una stringa DDL, viene definita una colonna generata e viene definita una colonna di partizione:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Per impostazione predefinita, le tabelle live Delta deducono lo schema dalla table
definizione se non si specifica uno schema.
Configurare una tabella di streaming per ignorare le modifiche in una tabella di streaming di origine
Nota
- Il
skipChangeCommits
flag funziona solo conspark.readStream
l'uso dellaoption()
funzione . Non è possibile usare questo flag in unadlt.read_stream()
funzione. - Non è possibile usare il
skipChangeCommits
flag quando la tabella di streaming di origine è definita come destinazione di una funzione apply_changes().
Per impostazione predefinita, le tabelle di streaming richiedono origini di sola accodamento. Quando una tabella di streaming usa un'altra tabella di streaming come origine e la tabella di streaming di origine richiede aggiornamenti o eliminazioni, ad esempio l'elaborazione del GDPR "diritto all'oblio", il skipChangeCommits
flag può essere impostato durante la lettura della tabella di streaming di origine per ignorare tali modifiche. Per altre informazioni su questo flag, vedere Ignorare gli aggiornamenti e le eliminazioni.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Proprietà delle tabelle live di Python Delta
Le tabelle seguenti descrivono le opzioni e le proprietà che è possibile specificare durante la definizione di tabelle e viste con tabelle live Delta:
@table oppure @view |
---|
name Tipo: str Nome facoltativo per la tabella o la vista. Se non definito, il nome della funzione viene usato come nome della tabella o della vista. |
comment Tipo: str Descrizione facoltativa per la tabella. |
spark_conf Tipo: dict Elenco facoltativo delle configurazioni di Spark per l'esecuzione di questa query. |
table_properties Tipo: dict Elenco facoltativo di proprietà della tabella per la tabella. |
path Tipo: str Percorso di archiviazione facoltativo per i dati della tabella. Se non è impostato, per impostazione predefinita il sistema verrà impostato sul percorso di archiviazione della pipeline. |
partition_cols Tipo: a collection of str Raccolta facoltativa, ad esempio , list di una o più colonne da utilizzare per il partizionamento della tabella. |
schema Tipo: str o StructType Definizione dello schema facoltativa per la tabella. Gli schemi possono essere definiti come stringa DDL SQL o con python StructType . |
temporary Tipo: bool Creare una tabella ma non pubblicare i metadati per la tabella. La temporary parola chiave indica a Delta Live Tables di creare una tabella disponibile per la pipeline, ma non deve essere accessibile all'esterno della pipeline. Per ridurre il tempo di elaborazione, una tabella temporanea viene mantenuta per la durata della pipeline che lo crea e non solo per un singolo aggiornamento.Il valore predefinito è "False". |
Definizione di tabella o vista |
---|
def <function-name>() Funzione Python che definisce il set di dati. Se il name parametro non è impostato, <function-name> viene usato come nome del set di dati di destinazione. |
query Istruzione SPARK SQL che restituisce un set di dati Spark o un dataframe Koalas. Usare dlt.read() o spark.table() per eseguire una lettura completa da un set di dati definito nella stessa pipeline. Quando si usa la spark.table() funzione per leggere da un set di dati definito nella stessa pipeline, anteporre la LIVE parola chiave al nome del set di dati nell'argomento della funzione. Ad esempio, per leggere da un set di dati denominato customers :spark.table("LIVE.customers") È anche possibile usare la spark.table() funzione per leggere da una tabella registrata nel metastore omettendo la LIVE parola chiave e qualificando facoltativamente il nome della tabella con il nome del database:spark.table("sales.customers") Usare dlt.read_stream() per eseguire una lettura di streaming da un set di dati definito nella stessa pipeline.Usare la spark.sql funzione per definire una query SQL per creare il set di dati restituito.Usare la sintassi PySpark per definire query delta live tables con Python. |
Aspettative |
---|
@expect("description", "constraint") Dichiarare un vincolo di qualità dei dati identificato da description . Se una riga viola le aspettative, includere la riga nel set di dati di destinazione. |
@expect_or_drop("description", "constraint") Dichiarare un vincolo di qualità dei dati identificato da description . Se una riga viola le aspettative, eliminare la riga dal set di dati di destinazione. |
@expect_or_fail("description", "constraint") Dichiarare un vincolo di qualità dei dati identificato da description . Se una riga viola le aspettative, arrestare immediatamente l'esecuzione. |
@expect_all(expectations) Dichiarare uno o più vincoli di qualità dei dati. expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, includere la riga nel set di dati di destinazione. |
@expect_all_or_drop(expectations) Dichiarare uno o più vincoli di qualità dei dati. expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, eliminare la riga dal set di dati di destinazione. |
@expect_all_or_fail(expectations) Dichiarare uno o più vincoli di qualità dei dati. expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, arrestare immediatamente l'esecuzione. |
Change Data Capture con Python nelle tabelle live Delta
Usare la apply_changes()
funzione nell'API Python per usare la funzionalità CDC di Delta Live Tables. L'interfaccia Python di Tabelle live Delta fornisce anche la funzione create_streaming_table(). È possibile usare questa funzione per creare la tabella di destinazione richiesta dalla apply_changes()
funzione .
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Nota
Il comportamento predefinito per INSERT
gli eventi e UPDATE
consiste nell'eseguire l'upsert degli eventi CDC dall'origine: aggiornare tutte le righe nella tabella di destinazione che corrispondono alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli DELETE
eventi può essere specificata con la APPLY AS DELETE WHEN
condizione .
Importante
È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della apply_changes
tabella di destinazione, è necessario includere anche le __START_AT
colonne e __END_AT
con lo stesso tipo di dati del sequence_by
campo.
Vedere APPLY CHANGES API :Semplificare Change Data Capture in Tabelle Live Delta.
Argomenti |
---|
target Tipo: str Nome della tabella da aggiornare. È possibile usare la funzione create_streaming_table() per creare la tabella di destinazione prima di eseguire la apply_changes() funzione.Questo parametro è obbligatorio. |
source Tipo: str Origine dati contenente record CDC. Questo parametro è obbligatorio. |
keys Tipo: list Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione. È possibile specificare uno dei due valori seguenti: * Elenco di stringhe: ["userId", "orderId"] * Elenco di funzioni SPARK SQL col() : [col("userId"), col("orderId"] Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId) .Questo parametro è obbligatorio. |
sequence_by Tipo: str o col() Nome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. Le tabelle live delta usano questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine. È possibile specificare uno dei due valori seguenti: * Stringa: "sequenceNum" * Una funzione SPARK SQL col() : col("sequenceNum") Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId) .Questo parametro è obbligatorio. |
ignore_null_updates Tipo: bool Consentire l'inserimento di aggiornamenti contenenti un subset delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e ignore_null_updates è True , le colonne con un null manterranno i valori esistenti nella destinazione. Questo vale anche per le colonne nidificate con il valore .null Quando ignore_null_updates è False , i valori esistenti verranno sovrascritti con null valori.Il parametro è facoltativo. Il valore predefinito è False . |
apply_as_deletes Tipo: str o expr() Specifica quando un evento CDC deve essere considerato come un DELETE upsert anziché un upsert. Per gestire i dati non ordinati, la riga eliminata viene temporaneamente mantenuta come rimozione definitiva nella tabella Delta sottostante e viene creata una vista nel metastore che filtra tali tombe. L'intervallo di conservazione può essere configurato conpipelines.cdc.tombstoneGCThresholdInSeconds proprietà table.È possibile specificare uno dei due valori seguenti: * Stringa: "Operation = 'DELETE'" * Una funzione SPARK SQL expr() : expr("Operation = 'DELETE'") Il parametro è facoltativo. |
apply_as_truncates Tipo: str o expr() Specifica quando un evento CDC deve essere considerato come una tabella TRUNCATE completa. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.Il apply_as_truncates parametro è supportato solo per scD di tipo 1. Il tipo SCD 2 non supporta il troncamento.È possibile specificare uno dei due valori seguenti: * Stringa: "Operation = 'TRUNCATE'" * Una funzione SPARK SQL expr() : expr("Operation = 'TRUNCATE'") Il parametro è facoltativo. |
column_list except_column_list Tipo: list Subset di colonne da includere nella tabella di destinazione. Utilizzare column_list per specificare l'elenco completo di colonne da includere. Utilizzare except_column_list per specificare le colonne da escludere. È possibile dichiarare un valore come elenco di stringhe o come funzioni SPARK SQL col() :* column_list = ["userId", "name", "city"] .* column_list = [col("userId"), col("name"), col("city")] * except_column_list = ["operation", "sequenceNum"] * except_column_list = [col("operation"), col("sequenceNum") Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId) .Il parametro è facoltativo. L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando alla funzione non viene passato alcun column_list argomento o except_column_list . |
stored_as_scd_type Tipo: str o int Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2. Impostare su 1 per scD di tipo 1 o 2 per scD di tipo 2.La clausola è facoltativa. Il valore predefinito è SCD di tipo 1. |
track_history_column_list track_history_except_column_list Tipo: list Subset di colonne di output da tenere traccia della cronologia nella tabella di destinazione. Utilizzare track_history_column_list per specificare l'elenco completo di colonne da tenere traccia. Utilizzotrack_history_except_column_list per specificare le colonne da escludere dal rilevamento. È possibile dichiarare un valore come elenco di stringhe o come funzioni SPARK SQL col() : - track_history_column_list = ["userId", "name", "city"] . - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId) .Il parametro è facoltativo. L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando no track_history_column_list otrack_history_except_column_list l'argomento viene passato alla funzione . |
Commenti e suggerimenti
https://aka.ms/ContentUserFeedback.
Presto disponibile: Nel corso del 2024 verranno gradualmente disattivati i problemi di GitHub come meccanismo di feedback per il contenuto e ciò verrà sostituito con un nuovo sistema di feedback. Per altre informazioni, vedereInvia e visualizza il feedback per