Uw eerste structured streaming-workload uitvoeren

Dit artikel bevat codevoorbeelden en uitleg van basisconcepten die nodig zijn om uw eerste Structured Streaming-query's uit te voeren op Azure Databricks. U kunt Structured Streaming gebruiken voor werkbelastingen in bijna realtime en incrementele verwerking.

Structured Streaming is een van de verschillende technologieën die stroomstreamingtabellen in Delta Live Tables mogelijk maken. Databricks raadt het gebruik van Delta Live Tables aan voor alle nieuwe ETL-, opname- en Structured Streaming-workloads. Zie Wat is Delta Live Tables?

Notitie

Hoewel Delta Live Tables een enigszins gewijzigde syntaxis biedt voor het declareren van streamingtabellen, is de algemene syntaxis voor het configureren van streaming-lees- en transformaties van toepassing op alle streaming-use cases in Azure Databricks. Delta Live Tables vereenvoudigt het streamen ook door statusinformatie, metagegevens en talloze configuraties te beheren.

Lezen uit een gegevensstroom

U kunt Structured Streaming gebruiken om incrementeel gegevens op te nemen uit ondersteunde gegevensbronnen. Enkele van de meest voorkomende gegevensbronnen die worden gebruikt in Azure Databricks Structured Streaming-workloads zijn onder andere:

  • Gegevensbestanden in cloudobjectopslag
  • Berichtenbussen en wachtrijen
  • Delta Lake

Databricks raadt aan autolaadprogramma's te gebruiken voor streamingopname van cloudobjectopslag. Auto Loader ondersteunt de meeste bestandsindelingen die worden ondersteund door Structured Streaming. Zie Wat is automatisch laadprogramma?

Elke gegevensbron biedt een aantal opties om op te geven hoe batches met gegevens moeten worden geladen. Tijdens de configuratie van de lezer moeten de belangrijkste opties die u mogelijk moet instellen, worden onderverdeeld in de volgende categorieën:

  • Opties waarmee de gegevensbron of -indeling wordt opgegeven (bijvoorbeeld bestandstype, scheidingstekens en schema).
  • Opties voor het configureren van toegang tot bronsystemen (bijvoorbeeld poortinstellingen en referenties).
  • Opties die aangeven waar u in een stream moet beginnen (bijvoorbeeld Kafka-offsets of het lezen van alle bestaande bestanden).
  • Opties waarmee wordt bepaald hoeveel gegevens in elke batch worden verwerkt (bijvoorbeeld maximale offsets, bestanden of bytes per batch).

Automatisch laden gebruiken om streaminggegevens uit objectopslag te lezen

In het volgende voorbeeld ziet u hoe u JSON-gegevens laadt met Auto Loader, die wordt gebruikt cloudFiles om indeling en opties aan te geven. Met de schemaLocation optie kunt u schemadeductie en evolutie inschakelen. Plak de volgende code in een Databricks-notebookcel en voer de cel uit om een streaming DataFrame met de naam raw_dfte maken:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Net als bij andere leesbewerkingen in Azure Databricks worden bij het configureren van een streaming-leesbewerking geen gegevens geladen. U moet een actie activeren voor de gegevens voordat de stream begint.

Notitie

Als u een streaming DataFrame aanroept display() , wordt een streamingtaak gestart. Voor de meeste gebruiksscenario's voor gestructureerd streamen moet de actie die een stroom activeert gegevens naar een sink schrijven. Zie Uw gestructureerde streamingcode voorbereiden voor productie.

Een streamingtransformatie uitvoeren

Structured Streaming ondersteunt de meeste transformaties die beschikbaar zijn in Azure Databricks en Spark SQL. U kunt zelfs MLflow-modellen als UDF's laden en streamingvoorspellingen doen als transformatie.

In het volgende codevoorbeeld wordt een eenvoudige transformatie voltooid om de opgenomen JSON-gegevens te verrijken met aanvullende informatie met behulp van Spark SQL-functies:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Het resulterende transformed_df bevat query-instructies voor het laden en transformeren van elke record wanneer deze in de gegevensbron binnenkomt.

Notitie

Structured Streaming behandelt gegevensbronnen als niet-gebonden of oneindige gegevenssets. Daarom worden sommige transformaties niet ondersteund in Structured Streaming-workloads, omdat hiervoor een oneindig aantal items moet worden gesorteerd.

Voor de meeste aggregaties en veel joins is het beheren van statusgegevens met watermerken, vensters en uitvoermodus vereist. Zie Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren.

Schrijven naar een gegevenssink

Een gegevenssink is het doel van een streaming-schrijfbewerking. Veelvoorkomende sinks die worden gebruikt in streamingworkloads van Azure Databricks zijn onder andere:

  • Delta Lake
  • Berichtenbussen en wachtrijen
  • Sleutel-waardedatabases

Net als bij gegevensbronnen bieden de meeste gegevenssinks een aantal opties om te bepalen hoe gegevens naar het doelsysteem worden geschreven. Tijdens de configuratie van de schrijver moeten de belangrijkste opties die u mogelijk moet instellen, worden onderverdeeld in de volgende categorieën:

  • Uitvoermodus (standaard toevoegen).
  • Een controlepuntlocatie (vereist voor elke schrijver).
  • Triggerintervallen; zie Triggerintervallen voor gestructureerd streamen configureren.
  • Opties waarmee de gegevenssink of -indeling wordt opgegeven (bijvoorbeeld bestandstype, scheidingstekens en schema).
  • Opties voor het configureren van toegang tot doelsystemen (bijvoorbeeld poortinstellingen en referenties).

Incrementele batch-schrijfbewerkingen uitvoeren naar Delta Lake

In het volgende voorbeeld wordt naar Delta Lake geschreven met behulp van een opgegeven bestandspad en controlepunt.

Belangrijk

Zorg er altijd voor dat u een unieke controlepuntlocatie opgeeft voor elke streamingschrijver die u configureert. Het controlepunt biedt de unieke identiteit voor uw stream, waarbij alle verwerkte records en statusgegevens worden bijgehouden die zijn gekoppeld aan uw streamingquery.

Met availableNow de instelling voor de trigger wordt gestructureerd streamen geïnstrueerd om alle eerder niet-verwerkte records uit de brongegevensset te verwerken en vervolgens af te sluiten, zodat u de volgende code veilig kunt uitvoeren zonder dat u zich zorgen hoeft te maken over het verlaten van een stroom die wordt uitgevoerd:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

In dit voorbeeld komen er geen nieuwe records binnen in de gegevensbron. Herhaal de uitvoering van deze code neemt dus geen nieuwe records op.

Waarschuwing

Uitvoering van gestructureerd streamen kan voorkomen dat automatische beëindiging rekenresources afsluit. Als u onverwachte kosten wilt voorkomen, moet u streamingquery's beëindigen.

Uw Structured Streaming-code voorbereiden voor productie

Databricks raadt het gebruik van Delta Live Tables aan voor de meeste structured streaming-workloads. De volgende aanbevelingen bieden een uitgangspunt voor het voorbereiden van Structured Streaming-workloads voor productie:

  • Verwijder overbodige code uit notebooks die resultaten opleveren, zoals display en count.
  • Voer geen Structured Streaming-workloads uit op interactieve clusters; plan altijd streams als taken.
  • Als u wilt helpen bij het automatisch herstellen van streamingtaken, configureert u taken met oneindige nieuwe pogingen.
  • Gebruik automatisch schalen niet voor workloads met Structured Streaming.

Zie Productieoverwegingen voor Gestructureerd streamen voor meer aanbevelingen.

Gegevens lezen uit Delta Lake, transformeren en schrijven naar Delta Lake

Delta Lake biedt uitgebreide ondersteuning voor het werken met Structured Streaming als bron en een sink. Zie lees- en schrijfbewerkingen voor Delta-tabellen.

In het volgende voorbeeld ziet u een voorbeeldsyntaxis om alle nieuwe records uit een Delta-tabel incrementeel te laden, deze samen te voegen met een momentopname van een andere Delta-tabel en deze naar een Delta-tabel te schrijven:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

U moet over de juiste machtigingen beschikken voor het lezen van brontabellen en schrijven naar doeltabellen en de opgegeven controlepuntlocatie. Vul alle parameters in die worden aangeduid met punthaken (<>) met behulp van de relevante waarden voor uw gegevensbronnen en sinks.

Notitie

Delta Live Tables biedt een volledig declaratieve syntaxis voor het maken van Delta Lake-pijplijnen en beheert automatisch eigenschappen zoals triggers en controlepunten. Zie Wat is Delta Live Tables?

Gegevens lezen uit Kafka, transformeren en schrijven naar Kafka

Apache Kafka en andere berichtenbussen bieden een aantal van de laagste latentie die beschikbaar is voor grote gegevenssets. U kunt Azure Databricks gebruiken om transformaties toe te passen op gegevens die zijn opgenomen vanuit Kafka en vervolgens gegevens terug te schrijven naar Kafka.

Notitie

Door gegevens naar cloudobjectopslag te schrijven, wordt extra latentieoverhead toegevoegd. Als u gegevens wilt opslaan uit een berichtenbus in Delta Lake, maar de laagste latentie nodig hebt voor streamingworkloads, raadt Databricks u aan afzonderlijke streamingtaken te configureren om gegevens op te nemen in lakehouse en bijna realtime transformaties toe te passen voor downstream-berichtenbus-sinks.

In het volgende codevoorbeeld ziet u een eenvoudig patroon om gegevens uit Kafka te verrijken door deze samen te voegen met gegevens in een Delta-tabel en vervolgens terug te schrijven naar Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

U moet over de juiste machtigingen beschikken voor toegang tot uw Kafka-service. Vul alle parameters in die worden aangeduid met punthaken (<>) met behulp van de relevante waarden voor uw gegevensbronnen en sinks. Zie Stream-verwerking met Apache Kafka en Azure Databricks.