Tutoriel sur Structured Streaming

Les capteurs, les appareils IoT, les réseaux sociaux et les transactions en ligne génèrent tous des données qui ont besoin d’être supervisées en permanence et traitées rapidement. Par conséquent, le besoin d’un traitement des flux en temps réel à grande échelle n’est jamais apparu aussi évident que maintenant. Ce module de tutoriel présente Structured Streaming, le principal modèle pour gérer des jeux de données de streaming dans Apache Spark. Dans Structured Streaming, un flux de données est traité comme une table faisant l’objet d’ajouts constants. Cela conduit à un modèle de traitement de flux très similaire à un modèle de traitement par lots. Vous exprimez votre calcul de streaming sous forme de requête standard de type traitement par lots comme sur une table statique, mais Spark l’exécute en tant que requête incrémentielle sur la table d’entrée non liée.

Structured Streaming workflow

Considérez le flux de données d’entrée comme la table d’entrée. Chaque élément de données qui arrive sur le flux est semblable à une nouvelle ligne ajoutée à la table d’entrée.

Structured Streaming model

Une requête sur l’entrée génère une table de résultats. À chaque intervalle de déclencheur (par exemple, toutes les secondes), de nouvelles lignes sont ajoutées à la table d’entrée, ce qui finit par mettre à jour la table de résultats. Chaque fois que la table de résultats est mise à jour, les lignes de résultats modifiées sont écrites dans un récepteur externe. La sortie se définit comme ce qui est écrit dans le stockage externe. La sortie peut être configurée dans différents modes :

  • Mode complet : La totalité de la table de résultats mise à jour est écrite dans le stockage externe. Il appartient au connecteur de stockage de déterminer comment gérer l’écriture de la table entière.
  • Mode ajout : Seules les nouvelles lignes ajoutées à la table de résultats depuis le dernier déclencheur sont écrites dans le stockage externe. Ceci s’applique uniquement aux requêtes dans lesquelles les lignes existantes dans la table de résultats ne sont pas censées changer.
  • Mode mise à jour : Seules les lignes qui ont été mises à jour dans la table de résultats depuis le dernier déclencheur sont écrites dans le stockage externe. Cela est différent du mode complet dans ce mode de mise à jour qui génère uniquement les lignes qui ont été modifiées depuis le dernier déclencheur. Si la requête ne contient pas d’agrégations, elle équivaut au mode ajout.

Dans ce module de tutoriel, vous allez apprendre à :

Nous fournissons aussi un exemple de notebook que vous pouvez importer pour accéder à tous les exemples de code du module en vue de les exécuter.

Charger des exemples de données

Le moyen le plus simple de démarrer avec Structured Streaming consiste à utiliser un exemple de jeu de données Azure Databricks disponible dans le dossier /databricks-datasets accessible dans l’espace de travail Azure Databricks. Azure Databricks contient des exemples de données d’événement, sous forme de fichiers dans /databricks-datasets/structured-streaming/events/, à utiliser pour générer une application Structured Streaming. Intéressons-nous au contenu de ce répertoire.

Event dataset

Chaque ligne du fichier contient un enregistrement JSON avec deux champs : time et action.

{"time":1469501675,"action":"Open"}
{"time":1469501678,"action":"Close"}
{"time":1469501680,"action":"Open"}
{"time":1469501685,"action":"Open"}
{"time":1469501686,"action":"Open"}
{"time":1469501689,"action":"Open"}
{"time":1469501691,"action":"Open"}
{"time":1469501694,"action":"Open"}
{"time":1469501696,"action":"Close"}
{"time":1469501702,"action":"Open"}
{"time":1469501703,"action":"Open"}
{"time":1469501704,"action":"Open"}

Initialiser le flux

Étant donné que les exemples de données sont juste un ensemble statique de fichiers, vous pouvez émuler un flux à partir d’eux en lisant un fichier à la fois, dans l’ordre chronologique de leur création.

from pyspark.sql.types import *
from pyspark.sql.functions import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Define the schema to speed up processing
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

streamingInputDF = (
  spark
    .readStream
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

streamingCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.action,
      window(streamingInputDF.time, "1 hour"))
    .count()
)

Démarrer le travail de streaming

Pour démarrer un calcul de streaming, vous définissez un récepteur et vous le démarrez. Dans notre cas, pour interroger les compteurs de manière interactive, définissez le jeu complet de compteurs d’1 heure dans une table en mémoire.

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

query est un handle de la requête de streaming nommée counts qui s’exécute en arrière-plan. Cette requête sélectionne des fichiers en continu et met à jour les compteurs fenêtrés.

La fenêtre de commande indique l’état du flux :

Stream status

Quand vous développez counts, vous recevez un tableau de bord du nombre d’enregistrements traités, des statistiques de traitement par lot et de l’état de l’agrégation :

Stream dashboard

Interroger le flux de manière interactive

Nous pouvons interroger régulièrement l’agrégation counts :

%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

Comme vous pouvez le voir dans cette série de captures d’écran, la requête change chaque fois que vous l’exécutez pour refléter le nombre d’actions en fonction du flux d’entrée des données.

Stream update 1

Stream update 2

Stream update 3

Notebook

Pour accéder à ces exemples de code et à d’autres, importez le notebook suivant. Pour obtenir d’autres exemples de Structured Streaming, consultez Structured Streaming.

Notebook Python Structured Streaming Apache Spark

Obtenir le notebook