Algemene patronen voor het laden van gegevens

Automatisch laden vereenvoudigt een aantal algemene gegevensopnametaken. Deze snelzoekgids bevat voorbeelden voor verschillende populaire patronen.

Mappen of bestanden filteren met globpatronen

Glob-patronen kunnen worden gebruikt voor het filteren van mappen en bestanden wanneer deze in het pad worden opgegeven.

Patroon Omschrijving
? Komt overeen met één teken
* Komt overeen met nul of meer tekens
[abc] Komt overeen met één teken uit de tekenset {a,b,c}.
[a-z] Komt overeen met één teken uit het tekenbereik {a... z}.
[^a] Komt overeen met één teken dat niet afkomstig is uit de tekenset of het bereik {a}. Houd er rekening mee dat het ^ teken direct rechts van de haak openen moet plaatsvinden.
{ab,cd} Komt overeen met een tekenreeks uit de reeks {ab, cd}.
{ab,c{de, fh}} Komt overeen met een tekenreeks uit de reeks {ab, cde, cfh}.

Gebruik het path voorvoegselpatronen, bijvoorbeeld:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Belangrijk

U moet de optie pathGlobFilter gebruiken om expliciet achtervoegselpatronen op te geven. Het path enige filter bevat een voorvoegselfilter.

Als u bijvoorbeeld alleen png bestanden wilt parseren in een map die bestanden met verschillende achtervoegsels bevat, kunt u het volgende doen:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Notitie

Het standaardgedrag van automatisch laden verschilt van het standaardgedrag van andere Spark-bestandsbronnen. Voeg .option("cloudFiles.useStrictGlobber", "true") toe aan uw leesbewerking om globbing te gebruiken die overeenkomt met het standaardgedrag van Spark voor bestandsbronnen. Zie de volgende tabel voor meer informatie over globbing:

Patroon Bestandspad Standaard globber Strikte globber
/a/b /a/b/c/file.txt Ja Ja
/a/b /a/b_dir/c/file.txt Nee Nee
/a/b /a/b.txt Nee Nee
/a/b/ /a/b.txt Nee Nee
/a/*/c/ /a/b/c/file.txt Ja Ja
/a/*/c/ /a/b/c/d/file.txt Ja Ja
/a/*/c/ /a/b/x/y/c/file.txt Ja Nee
/a/*/c /a/b/c_file.txt Ja Nee
/a/*/c/ /a/b/c_file.txt Ja Nee
/a/*/c/ /a/*/cookie/file.txt Ja Nee
/a/b* /a/b.txt Ja Ja
/a/b* /a/b/file.txt Ja Ja
/a/{0.txt,1.txt} /a/0.txt Ja Ja
/a/*/{0.txt,1.txt} /a/0.txt Nee Nee
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Ja Ja

Eenvoudige ETL inschakelen

Een eenvoudige manier om uw gegevens in Delta Lake te krijgen zonder gegevens te verliezen, is door het volgende patroon te gebruiken en schemadeductie in te schakelen met Auto Loader. Databricks raadt aan de volgende code uit te voeren in een Azure Databricks-taak om uw stream automatisch opnieuw op te starten wanneer het schema van uw brongegevens wordt gewijzigd. Het schema wordt standaard afgeleid als tekenreekstypen, eventuele parseringsfouten (er moeten geen fouten zijn als alles als een tekenreeks blijft) en _rescued_dataeventuele nieuwe kolommen mislukken de stroom en ontwikkelen het schema.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Gegevensverlies in goed gestructureerde gegevens voorkomen

Wanneer u uw schema kent, maar wilt weten wanneer u onverwachte gegevens ontvangt, raadt Databricks het gebruik aan rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Als u wilt dat uw stream stopt met verwerken als er een nieuw veld wordt geïntroduceerd dat niet overeenkomt met uw schema, kunt u het volgende toevoegen:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Flexibele semi-gestructureerde gegevenspijplijnen inschakelen

Wanneer u gegevens ontvangt van een leverancier die nieuwe kolommen introduceert voor de informatie die ze verstrekken, bent u mogelijk niet precies op de hoogte wanneer ze dit doen, of hebt u mogelijk niet de bandbreedte om uw gegevenspijplijn bij te werken. U kunt nu gebruikmaken van de ontwikkeling van schema's om de stroom opnieuw op te starten en automatisch het uitgestelde schema bij te werken. U kunt ook gebruikmaken schemaHints van sommige velden zonder schema die de leverancier mogelijk levert.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Geneste JSON-gegevens transformeren

Omdat automatisch laden de JSON-kolommen op het hoogste niveau afbakent als tekenreeksen, kunt u overblijven met geneste JSON-objecten waarvoor verdere transformaties nodig zijn. U kunt de semi-gestructureerde API's voor gegevenstoegang gebruiken om complexe JSON-inhoud verder te transformeren.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Geneste JSON-gegevens afleiden

Wanneer u geneste gegevens hebt, kunt u de cloudFiles.inferColumnTypes optie gebruiken om de geneste structuur van uw gegevens en andere kolomtypen af te stellen.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

CSV-bestanden laden zonder kopteksten

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Een schema afdwingen voor CSV-bestanden met headers

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Afbeelding of binaire gegevens opnemen in Delta Lake voor ML

Zodra de gegevens zijn opgeslagen in Delta Lake, kunt u gedistribueerde deductie uitvoeren op de gegevens. Zie Gedistribueerde deductie uitvoeren met pandas UDF.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Syntaxis van automatisch laadprogramma voor DLT

Delta Live Tables biedt enigszins gewijzigde Python-syntaxis voor Automatisch laden voegt SQL-ondersteuning toe voor Auto Loader.

In de volgende voorbeelden wordt Auto Loader gebruikt om gegevenssets te maken op basis van CSV- en JSON-bestanden:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

U kunt ondersteunde indelingsopties gebruiken met autolaadprogramma's. Met behulp van de map() functie kunt u opties doorgeven aan de cloud_files() methode. Opties zijn sleutel-waardeparen, waarbij de sleutels en waarden tekenreeksen zijn. Hier volgt een beschrijving van de syntaxis voor het werken met automatisch laden in SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

In het volgende voorbeeld worden gegevens uit door tabs gescheiden CSV-bestanden met een koptekst gelezen:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

U kunt de schema indeling handmatig opgeven. U moet de schema indeling opgeven voor indelingen die geen schemadeductie ondersteunen:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Notitie

Delta Live Tables configureert en beheert automatisch het schema en de controlepuntmappen wanneer u automatisch laden gebruikt om bestanden te lezen. Als u een van deze mappen echter handmatig configureert, heeft het uitvoeren van een volledige vernieuwing geen invloed op de inhoud van de geconfigureerde mappen. Databricks raadt aan de automatisch geconfigureerde directory's te gebruiken om onverwachte bijwerkingen tijdens de verwerking te voorkomen.