Informations de référence sur le langage Python dans Delta Live Tables

Cet article fournit des informations détaillées et des exemples pour l’interface de programmation Python dans Delta Live Tables. Pour obtenir la spécification complète de l’API, consultez Spécification de l’API Python.

Pour plus d’informations sur l’API SQL, consultez Informations de référence sur le langage SQL dans Delta Live Tables.

Jeux de données Python

L’API Python est définie dans le module dlt. Vous devez importer le module dlt dans vos pipelines Delta Live Tables implémentés avec l’API Python. Appliquez l'élément décoratif @dlt.view ou @dlt.table à une fonction pour définir une vue ou une table dans Python. Vous pouvez utiliser le nom de la fonction ou le paramètre name pour attribuer le nom de la table ou de la vue. L’exemple suivant définit deux jeux de données différents : une vue appelée taxi_raw qui prend un fichier JSON comme source d’entrée, et une table appelée filtered_data qui prend la vue taxi_raw comme entrée :

@dlt.view
def taxi_raw():
  return spark.read.json("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Les fonctions de vue et de table doivent retourner un DataFrame Spark ou un DataFrame Koalas. Un DataFrame Koalas retourné par une fonction est converti en jeu de données Spark par le runtime Delta Live Tables.

En plus de lire à partir de sources de données externes, vous pouvez accéder aux jeux de données définis dans le même pipeline avec la fonction read() de Delta Live Tables. L’exemple suivant illustre la création d’un jeu de données customers_filtered à l’aide de la fonction read() :

@dlt.table
def customers_raw():
  return spark.read.csv("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Vous pouvez également utiliser la fonction spark.table() pour accéder à un jeu de données défini dans le même pipeline ou dans une table inscrite dans le metastore. Lorsque vous utilisez la fonction spark.table() pour accéder à un jeu de données défini dans le pipeline, dans l’argument de fonction, ajoutez le mot clé LIVE au début du nom du jeu de données :

@dlt.table
def customers_raw():
  return spark.read.csv("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Pour lire des données dans une table inscrite dans le metastore, dans l’argument de fonction, omettez le mot clé LIVE et qualifiez éventuellement le nom de la table avec le nom de la base de données :

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Delta Live Tables garantit que le pipeline capture automatiquement la dépendance entre les jeux de données. Ces informations de dépendance sont utilisées pour déterminer l’ordre d’exécution lors d’une mise à jour et de l’enregistrement des informations de traçabilité dans le journal des événements d’un pipeline.

Vous pouvez également retourner un jeu de données à l’aide d’une expression spark.sql dans une fonction de requête. Pour lire dans un jeu de données interne, ajoutez LIVE. au début le nom du jeu de données :

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Les vues et les tables peuvent avoir les propriétés facultatives suivantes :

  • comment : description explicite de ce jeu de données.
  • spark_conf : dictionnaire Python contenant des configurations Spark pour l’exécution de cette requête uniquement.
  • Contraintes de qualité des données appliquées avec les attentes.

Les tables offrent également un contrôle supplémentaire de leur matérialisation :

  • Avec la propriété partition_cols, indiquez de quelle manière les tables sont partitionnées. Le partitionnement vous permet d’accélérer les requêtes.

  • Vous pouvez définir des propriétés de table lorsque vous définissez une vue ou une table. Pour plus d’informations, consultez Propriétés des tables.

  • Définissez un emplacement de stockage pour les données de table à l’aide du paramètre path. Par défaut, les données des tables sont stockées dans l’emplacement de stockage du pipeline si path n’est pas défini.

  • Vous pouvez éventuellement spécifier un schéma de table à l’aide d’une chaîne StructType Python ou SQL DDL. Les exemples suivants créent une table appelée sales avec un schéma explicitement spécifié :

    sales_schema = StructType([
      StructField("customer_id", StringType(), True),
      StructField("number_of_line_items", StringType(), True),
      StructField("order_datetime", StringType(), True),
      StructField("order_number", LongType(), True)]
    )
    
    @dlt.table(
      comment="Raw data on sales",
      schema=sales_schema)
    def sales():
      return ("...")
    
    @dlt.table(
      comment="Raw data on sales",
      schema="customer_id STRING, customer_name STRING, number_of_line_items STRING, order_datetime STRING, order_number LONG")
    def sales():
      return ("...")
    

    Par défaut, Delta Live Tables déduit le schéma de la définition table si vous ne spécifiez pas de schéma.

Bibliothèques Python

Pour spécifier des bibliothèques Python externes, utilisez la commande magic %pip install. Lorsqu’une mise à jour démarre, Delta Live Tables exécute toutes les cellules contenant une commande %pip install avant d’exécuter des définitions de table. Chaque notebook Python inclus dans le pipeline a accès à toutes les bibliothèques installées. L’exemple suivant installe un package appelé logger et le rend accessible globalement à tout notebook Python dans le pipeline :

%pip install logger

from logger import log_info

@dlt.table
def dataset():
    log_info(...)
    return dlt.read(..)

Spécification de l’API Python

Module Python

Les fonctions Python Delta Live Tables sont définies dans le module dlt. Vos pipelines implémentés avec l’API Python doivent importer ce module :

import dlt

Créer une table

Pour définir une table dans Python, appliquez l'élément décoratif @table. L'élément décoratif @table est un alias pour l'élément décoratif @create_table.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Créer une vue

Pour définir une vue dans Python, appliquez l'élément décoratif @view. L'élément décoratif @view est un alias pour l'élément décoratif @create_view.

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Propriétés Python

@table ou @view
name

Entrez : str

Nom facultatif pour la table ou la vue. S’il n’est pas défini, le nom de la fonction est utilisé comme nom de la table ou de la vue.
comment

Entrez : str

Description facultative de la table.
spark_conf

Entrez : dict

Liste facultative de configurations Spark pour l’exécution de cette requête.
table_properties

Entrez : dict

Liste facultative des propriétés de table disponibles pour la table.
path

Entrez : str

Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline.
partition_cols

Entrez : array

Liste facultative d’une ou de plusieurs colonnes à utiliser pour le partitionnement de la table.
schema

Type : str ou StructType

Définition de schéma facultative pour la table. Les schémas peuvent être définis en tant que chaîne SQL DDL ou avec Python
StructType.
temporaire

Entrez : bool

Crée une table temporaire. Aucune métadonnée n’est conservée pour cette table.

La valeur par défaut est False.
Définition de la table ou de la vue
def ()

Fonction Python qui définit le jeu de données. Si le paramètre name n’est pas défini, <function-name> est utilisé comme nom du jeu de données cible.
query

Instruction SQL Spark qui retourne un jeu de données Spark ou un DataFrame Koalas.

Utilisez dlt.read() ou spark.table() pour effectuer une lecture complète à partir d’un jeu de données défini dans le même pipeline. Lorsque vous utilisez la fonction spark.table() pour lire dans un jeu de données défini dans le même pipeline, ajoutez le mot clé LIVE au début du nom du jeu de données dans l’argument de fonction. Par exemple, pour lire dans un jeu de données nommé customers :

spark.table("LIVE.customers")

Vous pouvez également utiliser la fonction spark.table() pour lire dans une table inscrite dans le metastore en omettant le mot clé LIVE et en qualifiant éventuellement le nom de la table avec le nom de la base de données :

spark.table("sales.customers")

Utilisez dlt.read_stream() pour effectuer une lecture en streaming à partir d’un jeu de données défini dans le même pipeline.

Utilisez la fonction spark.sql pour définir une requête SQL afin de créer le jeu de données de retour.

Utilisez la syntaxe PySpark pour définir des requêtes Delta Live Tables avec Python.
Attentes
@expect(“description”, “constraint”)

Déclarez une contrainte de qualité des données identifiée par
description. Si une ligne enfreint l’attente, incluez la ligne dans le jeu de données cible.
@expect_or_drop(“description”, “constraint”)

Déclarez une contrainte de qualité des données identifiée par
description. Si une ligne enfreint l’attente, supprimez la ligne du jeu de données cible.
@expect_or_fail(“description”, “constraint”)

Déclarez une contrainte de qualité des données identifiée par
description. Si une ligne enfreint l’attente, arrêtez immédiatement l’exécution.
@expect_all(expectations)

Déclarez une ou plusieurs contraintes de qualité des données.
expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, incluez la ligne dans le jeu de données cible.
@expect_all_or_drop(expectations)

Déclarez une ou plusieurs contraintes de qualité des données.
expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, supprimez la ligne du jeu de données cible.
@expect_all_or_fail(expectations)

Déclarez une ou plusieurs contraintes de qualité des données.
expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, arrêtez immédiatement l’exécution.

Propriétés des tables

Outre les propriétés de table prises en charge par Delta Lake, vous pouvez définir les propriétés de table suivantes.

Propriétés des tables
pipelines.autoOptimize.managed

Valeur par défaut : true

Active ou désactive l’optimisation planifiée automatique de cette table.
pipelines.autoOptimize.zOrderCols

Valeur par défaut : None

Liste facultative de noms de colonnes séparés par des virgules qui détermine l’ordre de plan de cette table.
pipelines.reset.allowed

Valeur par défaut : true

Détermine si l’actualisation complète de cette table est autorisée.
pipelines.trigger.interval

Valeur par défaut : dépend du type du flux

Contrôle l’intervalle de déclenchement d’un flux de mise à jour de cette table. Valeurs par défaut :

* Quatre secondes pour les requêtes de streaming.
* Une minute pour les requêtes complètes lorsque toutes les données d’entrée proviennent de sources Delta.
* Dix minutes pour les requêtes complètes lorsque certaines données d’entrée peuvent provenir de sources non Delta. Consultez Tables et vues dans les pipelines continus.

La valeur est un nombre exprimé dans l’unité de temps choisie. Les unités de temps valides sont les suivantes :

* second, seconds
* minute, minutes
* hour, hours
* day, days

Vous pouvez utiliser l’unité au singulier ou au pluriel lorsque vous définissez la valeur. Par exemple :

* {"pipelines.trigger.interval" : "1 hour"}
* {"pipelines.trigger.interval" : "10 seconds"}
* {"pipelines.trigger.interval" : "30 second"}
* {"pipelines.trigger.interval" : "1 minute"}
* {"pipelines.trigger.interval" : "10 minutes"}
* {"pipelines.trigger.interval" : "10 minute"}