Share via


Transformer des données avec Delta Live Tables

Cet article explique comment vous pouvez utiliser Delta Live Tables pour déclarer des transformations sur des jeux de données, et spécifier la façon dont les enregistrements sont traités via la logique de requête. Il contient également quelques exemples de modèles de transformation courants qui peuvent être utiles pour créer des pipelines Delta Live Tables.

Vous pouvez définir un jeu de données pour n’importe quelle requête retournant un DataFrame. Vous pouvez utiliser des opérations intégrées Apache Spark, des fonctions définies par l’utilisateur, une logique personnalisée et les modèles MLflow en tant que transformations dans votre pipeline Delta Live Tables. Une fois les données ingérées dans votre pipeline Delta Live Tables, vous pouvez définir de nouveaux jeux de données sur des sources amont pour créer des tables de streaming, des vues matérialisées et des vues.

Pour savoir comment effectuer efficacement un traitement avec état avec Delta Live Tables, consultez Optimiser le traitement avec état dans Delta Live Tables avec des filigranes.

Quand utiliser les vues, les vues matérialisées et les tables de streaming

Pour garantir l’efficacité et la maintenance de vos pipelines, choisissez le meilleur type de jeu de données quand vous implémentez vos requêtes de pipeline.

Envisagez d’utiliser une vue dans les cas suivants :

  • Vous avez une requête volumineuse ou complexe que vous souhaitez décomposer en requêtes plus faciles à gérer.
  • Vous souhaitez valider les résultats intermédiaires à l’aide des attentes.
  • Vous souhaitez réduire les coûts de stockage et de calcul sans nécessiter la matérialisation des résultats de la requête. Étant donné que les tables sont matérialisées, elles nécessitent des ressources de calcul et de stockage supplémentaires.

Utilisez une vue matérialisée dans les cas suivants :

  • Plusieurs requêtes en aval consomment la table. Étant donné que les vues sont calculées à la demande, la vue est calculée à nouveau chaque fois que la vue est interrogée.
  • D’autres pipelines, travaux ou requêtes consomment la table. Étant donné que les vues ne sont pas matérialisées, vous ne pouvez les utiliser que dans le même pipeline.
  • Vous souhaitez afficher les résultats d’une requête pendant le développement. Étant donné que les tables sont matérialisées et peuvent être consultées et interrogées en dehors du pipeline, l’utilisation de tables pendant le développement peut vous aider à valider l’exactitude des calculs. Après la validation, convertissez les requêtes qui ne nécessitent pas de matérialisation en vues.

Utilisez une table de streaming dans les cas suivants :

  • Une requête est définie sur une source de données qui augmente en continu ou de manière incrémentielle.
  • Les résultats de requête doivent être calculés de manière incrémentielle.
  • Un débit élevé et une faible latence sont souhaités pour le pipeline.

Remarque

Les tables de streaming sont toujours définies sur des sources de streaming. Vous pouvez également utiliser des sources de streaming avec APPLY CHANGES INTO pour appliquer des mises à jour à partir de flux CDC. Consultez API APPLY CHANGES : Simplifier la capture des changements de données dans Delta Live Tables.

Combiner des tables de streaming et des vues matérialisées dans un même pipeline

Les tables de streaming héritent des garanties de traitement d’Apache Spark Structured Streaming, et sont configurées pour traiter les requêtes provenant de sources de données avec ajout uniquement, où les nouvelles lignes sont toujours insérées dans la table source au lieu d’être modifiées.

Remarque

Bien que, par défaut, les tables de diffusion en continu nécessitent des sources de données en ajout uniquement, quand une source de diffusion en continu est une autre table de diffusion en continu qui nécessite des mises à jour ou des suppressions, vous pouvez remplacer ce comportement en utilisant l’indicateur skipChangeCommits.

Un modèle de streaming courant comprend l’ingestion de données sources pour créer les jeux de données initiaux dans un pipeline. Ces jeux de données initiaux sont couramment appelés tables bronze et effectuent souvent des transformations simples.

En revanche, les tables finales d’un pipeline, couramment appelées tables or, nécessitent souvent des agrégations complexes, ou la lecture de sources qui représentent les cibles d’une opération APPLY CHANGES INTO. Dans la mesure où ces opérations créent par nature des mises à jour plutôt que des ajouts, elles ne sont pas prises en charge en tant qu’entrées dans les tables de streaming. Ces transformations sont plus adaptées aux vues matérialisées.

En combinant les tables de streaming et les vues matérialisées dans un seul pipeline, vous pouvez simplifier votre pipeline, éviter une réingestion ou un retraitement coûteux des données brutes, et disposer de toute la puissance du SQL pour calculer des agrégations complexes sur un jeu de données codé et filtré efficacement. L’exemple suivant illustre ce type de traitement mixte :

Remarque

Ces exemples utilisent Auto Loader pour charger des fichiers à partir du stockage cloud. Si vous souhaitez charger des fichiers avec Auto Loader dans un pipeline pour lequel Unity Catalog est activé, vous devez utiliser des emplacements externes. Pour en savoir plus sur l’utilisation de Unity Catalog avec Delta Live Tables, consultez l’article Utiliser Unity Catalog avec vos pipelines Delta Live Tables.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

En savoir plus sur l’utilisation d’Auto Loader pour lire efficacement des fichiers JSON à partir du stockage Azure pour un traitement incrémentiel.

Jointures statiques de flux

Les jointures statiques de flux sont un bon choix pour la dénormalisation d’un flux continu de données avec ajout uniquement sur une table de dimension principalement statique.

À chaque mise à jour du pipeline, les nouveaux enregistrements du flux sont joints à la capture instantanée la plus récente de la table statique. Si des enregistrements sont ajoutés ou mis à jour dans la table statique après le traitement des données correspondantes de la table de streaming, les enregistrements résultants ne sont pas recalculés, sauf si une actualisation complète est effectuée.

Dans les pipelines configurés pour l’exécution déclenchée, la table statique retourne les résultats au moment du démarrage de la mise à jour. Dans les pipelines configurés pour une exécution continue, chaque fois que la table traite une mise à jour, la version la plus récente de la table statique est interrogée.

Voici un exemple de jointure statique de flux :

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Calculer efficacement les agrégats

Vous pouvez utiliser des tables de streaming pour calculer de manière incrémentielle des agrégats distributifs simples, par exemple le nombre, la valeur minimale, la valeur maximale ou la somme, et des agrégats algébriques tels que la moyenne ou l’écart type. Databricks recommande une agrégation incrémentielle pour les requêtes comportant un nombre limité de groupes, par exemple, une requête avec une clause GROUP BY country. Seules les nouvelles données d’entrée sont lues à chaque mise à jour.

Pour en savoir plus sur l’écriture de requêtes Delta Live Tables qui effectuent des agrégations incrémentielles, consultez Effectuer des agrégations fenêtrées avec des filigranes.

Utiliser des modèles MLflow dans un pipeline de tables dynamiques Delta

Remarque

Pour utiliser les modèles MLflow dans un pipeline compatible avec Unity Catalog, celui-ci doit être configuré pour utiliser le canal preview. Pour utiliser le canal current, votre pipeline doit être configuré pour publier dans le Metastore Hive.

Vous pouvez utiliser des modèles entraînés par MLflow dans des pipelines Delta Live Tables. Les modèles MLflow sont traités comme des transformations dans Azure Databricks, ce qui signifie qu’ils agissent sur une entrée de DataFrame Spark, et qu’ils retournent des résultats sous la forme d’un DataFrame Spark. Dans la mesure où Delta Live Tables définit des jeux de données par rapport à des DataFrames, vous pouvez convertir les charges de travail Apache Spark qui tirent parti de MLflow en Delta Live Tables en quelques lignes de code. Consultez Gestion de cycle de vie ML en utilisant MLflow pour obtenir plus d’informations sur MLflow.

Si vous disposez déjà d’un notebook Python appelant un modèle MLflow, vous pouvez adapter ce code à Delta Live Tables en utilisant l’élément décoratif @dlt.table et en vérifiant que les fonctions sont définies pour retourner les résultats de la transformation. Delta Live Tables n’installe pas MLflow par défaut. Veillez donc à exécuter %pip install mlflow et à importer mlflow et dlt en haut de votre notebook. Pour découvrir une présentation de la syntaxe Delta Live Tables, consultez Exemple : ingérer et traiter des bases de données de noms de bébés à New York.

Pour utiliser des modèles MLflow dans Delta Live Tables, suivez les étapes ci-dessous :

  1. Obtenez l’ID d’exécution et le nom de modèle du modèle MLflow. L’ID d’exécution et le nom de modèle sont utilisés pour construire l’URI du modèle MLflow.
  2. Utilisez l’URI pour définir une FDU Spark afin de charger le modèle MLflow.
  3. Appelez l’UDF dans vos définitions de table pour utiliser le modèle MLflow.

L’exemple suivant illustre la syntaxe de base de ce modèle :

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Voici un exemple complet où le code suivant comporte une fonction Spark définie par l’utilisateur, nommée loaded_model_udf, qui charge un modèle MLflow entraîné sur des données relatives à des risques de prêt. Les colonnes de données utilisées pour effectuer la prédiction sont passées en tant qu’argument à la fonction définie par l’utilisateur. La table loan_risk_predictions calcule les prédictions pour chaque ligne de loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Conserver les suppressions ou mises à jour manuelles

Delta Live Tables vous permet de supprimer ou de mettre à jour manuellement les enregistrements d’une table, et d’effectuer une opération d’actualisation pour recalculer les tables en aval.

Par défaut, Delta Live Tables recalcule les résultats des tables en fonction des données d’entrée chaque fois qu’un pipeline est mis à jour. Vous devez donc vérifier que l’enregistrement supprimé n’est pas rechargé à partir des données sources. L’affectation de la valeur false à la propriété de table pipelines.reset.allowed empêche les actualisations d’une table, mais elle n’empêche pas les écritures incrémentielles dans les tables ni la circulation de nouvelles données dans la table.

Le diagramme suivant illustre un exemple utilisant deux tables de streaming :

  • raw_user_table ingère des données utilisateur brutes à partir d’une source.
  • bmi_table calcule de manière incrémentielle les scores de IMC à l’aide du poids et de la hauteur de raw_user_table .

Vous souhaitez supprimer ou mettre à jour manuellement les enregistrements utilisateur de raw_user_table, et recalculer le bmi_table.

Diagramme de conservation des données

Le code suivant illustre l’affectation de la valeur false à la propriété de table pipelines.reset.allowed pour désactiver l’actualisation complète de raw_user_table. Ainsi, les changements prévus sont conservés au fil du temps, mais les tables en aval sont recalculées quand une mise à jour du pipeline est exécutée :

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);