Zelfstudie voor gestructureerd streamen

Sensoren, IoT-apparaten, sociale netwerken en onlinetransacties genereren allemaal gegevens die voortdurend moeten worden bewaakt en waarop snel actie moet worden onder ondernemen. Als gevolg hiervan is de behoefte aan grootschalige, realtime stroomverwerking duidelijker dan ooit tevoren. In deze zelfstudiemodule maakt u kennis met Structured Streaming, het belangrijkste model voor het verwerken van streaming-gegevenssets in Apache Spark. In Structured Streaming wordt een gegevensstroom behandeld als een tabel die continu wordt toegevoegd. Dit leidt tot een stroomverwerkingsmodel dat vergelijkbaar is met een batchverwerkingsmodel. U drukt uw streamingberekening uit als een standaard batch-achtige query zoals in een statische tabel, maar Spark voert deze uit als een incrementele query op de niet-gebonden invoertabel.

Structured Streaming-werkstroom

U kunt de invoergegevensstroom beschouwen als de invoertabel. Elk gegevensitem dat op de stroom binnenkomt, is als een nieuwe rij die wordt toegevoegd aan de invoertabel.

Gestructureerd streamingmodel

Een query op de invoer genereert een resultaattabel. Bij elk triggerinterval (bijvoorbeeld elke 1 seconde) worden nieuwe rijen toegevoegd aan de invoertabel, waardoor de resultaattabel uiteindelijk wordt bijgewerkt. Wanneer de resultaattabel wordt bijgewerkt, worden de gewijzigde resultaatrijen naar een externe sink geschreven. De uitvoer wordt gedefinieerd als wat naar de externe opslag wordt geschreven. De uitvoer kan in verschillende modi worden geconfigureerd:

  • Volledige modus:de volledige bijgewerkte resultaattabel wordt naar de externe opslag geschreven. Het is aan de opslagconnector om te bepalen hoe het schrijven van de hele tabel moet worden verwerkt.
  • Modus toevoegen: alleennieuwe rijen die zijn toegevoegd aan de resultaattabel sinds de laatste trigger, worden naar de externe opslag geschreven. Dit is alleen van toepassing op de query's waarbij de bestaande rijen in de resultaattabel naar verwachting niet zullen veranderen.
  • Updatemodus:alleen de rijen die zijn bijgewerkt in de resultaattabel sinds de laatste trigger, worden naar externe opslag geschreven. Dit wijs af van de modus Volledig in die updatemodus die alleen de rijen uitvoert die sinds de laatste trigger zijn gewijzigd. Als de query geen aggregaties bevat, is deze gelijk aan de modus Toevoegen.

In deze zelfstudiemodule leert u het volgende:

We bieden ook een voorbeeldnotenoteboek dat u kunt importeren om toegang te krijgen tot alle codevoorbeelden die in de module zijn opgenomen en uit te voeren.

Voorbeeldgegevens laden

De eenvoudigste manier om aan de slag te gaan met Structured Streaming is het gebruik van een voorbeeld van Azure Databricks gegevensset die beschikbaar is in de map die toegankelijk is in /databricks-datasets Azure Databricks werkruimte. Azure Databricks voorbeeldgebeurtenisgegevens als bestanden in om te gebruiken /databricks-datasets/structured-streaming/events/ voor het bouwen van een structured streaming-toepassing. Laten we eens kijken naar de inhoud van deze map.

Gebeurtenisset

Elke regel in het bestand bevat een JSON-record met twee velden: time en action .

{"time":1469501675,"action":"Open"}
{"time":1469501678,"action":"Close"}
{"time":1469501680,"action":"Open"}
{"time":1469501685,"action":"Open"}
{"time":1469501686,"action":"Open"}
{"time":1469501689,"action":"Open"}
{"time":1469501691,"action":"Open"}
{"time":1469501694,"action":"Open"}
{"time":1469501696,"action":"Close"}
{"time":1469501702,"action":"Open"}
{"time":1469501703,"action":"Open"}
{"time":1469501704,"action":"Open"}

De stream initialiseren

Omdat de voorbeeldgegevens slechts een statische set bestanden zijn, kunt u er een stroom van emuleren door één bestand per keer te lezen, in chronologische volgorde waarin ze zijn gemaakt.

from pyspark.sql.types import *
from pyspark.sql.functions import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Define the schema to speed up processing
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

streamingInputDF = (
  spark
    .readStream
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

streamingCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.action,
      window(streamingInputDF.time, "1 hour"))
    .count()
)

De streaming-taak starten

U start een streamingberekening door een sink te definiëren en te starten. Als u in ons geval interactief query's wilt uitvoeren op de tellingen, stelt u de volledige set van 1 uur counts in op een in-memory tabel.

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

query is een greep naar de streamingquery met counts de naam die op de achtergrond wordt uitgevoerd. Met deze query worden continu bestanden opgehaald en worden de tellingen in vensters bijgewerkt.

In het opdrachtvenster wordt de status van de stream weergegeven:

Streamstatus

Wanneer u uitbreidt, krijgt u een dashboard met het aantal verwerkte records, batchstatistieken en de counts status van de aggregatie:

Stream-dashboard

Interactief query's uitvoeren op de stroom

We kunnen periodiek een query uitvoeren op counts de aggregatie:

%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

Zoals u in deze reeks schermafbeeldingen kunt zien, verandert de query telkens wanneer u deze uitvoert, zodat het aantal acties wordt weergegeven op basis van de invoerstroom van gegevens.

Stream-update 1

Stream-update 2

Stream-update 3

Notebook

Importeer het volgende notebook om toegang te krijgen tot deze codevoorbeelden en meer. Zie Structured Streaming voor meer structured streaming-voorbeelden.

Apache Spark Structured Streaming Python-notebook

Notebook downloaden