Utiliser Delta Lake pour le streaming des données

Effectué

Toutes les données que nous avons explorées à ce stade étaient des données statiques dans des fichiers. Toutefois, de nombreux scénarios d’analytique des données impliquent des données de diffusion en continu qui doivent être traitées en temps quasi réel. Par exemple, vous devrez peut-être capturer les lectures émises par les appareils IoT (Internet des objets) et les stocker dans une table à mesure qu’elles se produisent.

Spark Structured Streaming

Une solution de traitement de flux classique implique la lecture constante d’un flux de données à partir d’une source, le traitement facultatif pour sélectionner des champs spécifiques, l’agrégation et le regroupement de valeurs, ou la manipulation des données, et l’écriture des résultats dans un récepteur.

Spark inclut la prise en charge native des données de diffusion en continu via Spark Structured Streaming, une API basée sur un dataframe sans limite dans lequel les données de diffusion en continu sont capturées pour traitement. Un dataframe Spark Structured Streaming peut lire des données à partir de nombreux types de sources de diffusion en continu, notamment les ports réseau, les services de répartiteur de messages en temps réel comme Azure Event Hubs ou Kafka, ou les emplacements du système de fichiers.

Conseil

Pour plus d’informations sur Spark Structured Streaming, consultez le Guide de programmation de streaming structuré de la documentation Spark.

Diffusion en continu avec des tables Delta Lake

Vous pouvez utiliser une table Delta Lake comme source ou récepteur pour Spark Structured Streaming. Par exemple, vous pouvez capturer un flux de données en temps réel à partir d’un appareil IoT et écrire le flux directement dans une table Delta Lake en tant que récepteur, ce qui vous permet d’interroger la table pour afficher les données diffusées les plus récentes. Vous pouvez également lire une table delta en tant que source de diffusion en continu, ce qui vous permet de signaler constamment les nouvelles données à mesure qu’elles sont ajoutées à la table.

Utilisation d’une table Delta Lake comme source de diffusion en continu

Dans l’exemple PySpark suivant, une table Delta Lake est utilisée pour stocker les détails des commandes commerciales sur Internet. Un flux est créé et lit les données du dossier de la table Delta Lake à mesure que de nouvelles données sont ajoutées.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.show()

Notes

Lorsque vous utilisez une table Delta Lake comme source de diffusion en continu, seules des opérations d’ajout peuvent être incluses dans le flux. Les modifications de données provoquent une erreur, sauf si vous spécifiez l’option ignoreChanges ou ignoreDeletes.

Après avoir lu les données de la table Delta Lake dans un dataframe de diffusion en continu, vous pouvez utiliser l’API Spark Structured Streaming pour les traiter. Dans l’exemple ci-dessus, le dataframe est simplement affiché ; mais vous pouvez utiliser Spark Structured Streaming pour agréger les données sur des fenêtres temporelles (par exemple, pour compter le nombre de commandes passées toutes les minutes) et envoyer les résultats agrégés à un processus en aval pour une visualisation en temps quasi réel.

Utilisation d’une table Delta Lake comme récepteur de diffusion en continu

Dans l’exemple PySpark suivant, un flux de données est lu à partir de fichiers JSON dans un dossier. Les données JSON de chaque fichier contiennent l’état d’un appareil IoT au format {"device":"Dev1","status":"ok"}. Les nouvelles données ajoutées au flux chaque fois qu’un fichier est ajouté au dossier. Le flux d’entrée est un dataframe sans limite, qui est ensuite écrit au format delta dans un emplacement de dossier pour une table Delta Lake.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
streamFolder = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

Notes

L’option checkpointLocation est utilisée pour écrire un fichier de point de contrôle qui suit l’état du traitement de flux. Ce fichier vous permet de récupérer à partir d’une défaillance au point où le traitement de flux a été arrêté.

Une fois le processus de diffusion en continu démarré, vous pouvez interroger la table Delta Lake dans laquelle la sortie de diffusion en continu est écrite pour afficher les données les plus récentes. Par exemple, le code suivant crée une table de catalogue pour le dossier de table Delta Lake et l’interroge :

%sql

CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';

SELECT device, status
FROM DeviceTable;

Pour arrêter le flux de données en cours d’écriture dans la table Delta Lake, vous pouvez utiliser la méthode stop de la requête de diffusion en continu :

delta_stream.stop()

Conseil

Pour plus d’informations sur l’utilisation des tables Delta Lake pour la diffusion en continu des données, consultez Lectures et écritures de diffusion en continu de tables dans la documentation Delta Lake.