Vanliga datainläsningsmönster

Automatisk inläsning förenklar ett antal vanliga datainmatningsuppgifter. Den här snabbreferensen innehåller exempel på flera populära mönster.

Filtrera kataloger eller filer med hjälp av globmönster

Globmönster kan användas för att filtrera kataloger och filer när de anges i sökvägen.

Mönster Description
? Matchar ett enskilt tecken
* Matchar noll eller fler tecken
[abc] Matchar ett enskilt tecken från teckenuppsättningen {a,b,c}.
[a-z] Matchar ett enskilt tecken från teckenområdet {a... z}.
[^a] Matchar ett enskilt tecken som inte kommer från teckenuppsättningen eller intervallet {a}. Observera att ^ tecknet måste ske omedelbart till höger om den inledande hakparentesen.
{ab,cd} Matchar en sträng från stränguppsättningen {ab, cd}.
{ab,c{de, fh}} Matchar en sträng från stränguppsättningen {ab, cde, cfh}.

path Använd för att tillhandahålla prefixmönster, till exempel:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Viktigt!

Du måste använda alternativet pathGlobFilter för att uttryckligen tillhandahålla suffixmönster. Det path enda ger ett prefixfilter.

Om du till exempel bara png vill parsa filer i en katalog som innehåller filer med olika suffix kan du göra följande:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Kommentar

Standardbeteendet för den automatiska inläsaren skiljer sig från standardbeteendet för andra Spark-filkällor. Lägg till .option("cloudFiles.useStrictGlobber", "true") i läsningen om du vill använda globbning som matchar standardbeteendet för Spark mot filkällor. Mer information om globbning finns i följande tabell:

Mönster Filsökväg Standardglobber Strikt klotber
/a/b /a/b/c/file.txt Ja Ja
/a/b /a/b_dir/c/file.txt Nej Nej
/a/b /a/b.txt Nej Nej
/a/b/ /a/b.txt Nej Nej
/a/*/c/ /a/b/c/file.txt Ja Ja
/a/*/c/ /a/b/c/d/file.txt Ja Ja
/a/*/c/ /a/b/x/y/c/file.txt Ja Nej
/a/*/c /a/b/c_file.txt Ja Nej
/a/*/c/ /a/b/c_file.txt Ja Nej
/a/*/c/ /a/*/cookie/file.txt Ja Nej
/a/b* /a/b.txt Ja Ja
/a/b* /a/b/file.txt Ja Ja
/a/{0.txt,1.txt} /a/0.txt Ja Ja
/a/*/{0.txt,1.txt} /a/0.txt Nej Nej
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Ja Ja

Aktivera enkel ETL

Ett enkelt sätt att hämta dina data till Delta Lake utan att förlora några data är att använda följande mönster och aktivera schemainferens med Auto Loader. Databricks rekommenderar att du kör följande kod i ett Azure Databricks-jobb så att den automatiskt startar om dataströmmen när schemat för dina källdata ändras. Som standard härleds schemat som strängtyper, eventuella parsningsfel (det bör inte finnas något om allt förblir som en sträng) kommer att gå till _rescued_data, och alla nya kolumner kommer att misslyckas strömmen och utveckla schemat.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Förhindra dataförlust i välstrukturerade data

När du känner till ditt schema, men vill veta när du får oväntade data, rekommenderar Databricks att du använder rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Om du vill att dataströmmen ska sluta bearbetas om ett nytt fält introduceras som inte matchar schemat kan du lägga till:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Aktivera flexibla halvstrukturerade datapipelines

När du tar emot data från en leverantör som introducerar nya kolumner i den information de tillhandahåller kanske du inte känner till exakt när de gör det, eller så kanske du inte har bandbredden för att uppdatera din datapipeline. Nu kan du använda schemautveckling för att starta om strömmen och låta Auto Loader uppdatera det härledda schemat automatiskt. Du kan också använda schemaHints för några av de "schemalösa" fält som leverantören kan tillhandahålla.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformera kapslade JSON-data

Eftersom Auto Loader härleder JSON-kolumnerna på den översta nivån som strängar kan du lämnas med kapslade JSON-objekt som kräver ytterligare omvandlingar. Du kan använda api:er för halvstrukturerad dataåtkomst för att ytterligare transformera komplext JSON-innehåll.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Härled kapslade JSON-data

När du har kapslade data kan du använda cloudFiles.inferColumnTypes alternativet för att härleda den kapslade strukturen för dina data och andra kolumntyper.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Läs in CSV-filer utan rubriker

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Framtvinga ett schema på CSV-filer med rubriker

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Mata in bild- eller binärdata till Delta Lake för ML

När data har lagrats i Delta Lake kan du köra distribuerad slutsatsdragning på data. Se Utföra distribuerad slutsatsdragning med pandas UDF.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Syntax för automatisk inläsning för DLT

Delta Live Tables innehåller något ändrad Python-syntax för automatisk inläsning lägger till SQL-stöd för automatisk inläsning.

I följande exempel används Auto Loader för att skapa datauppsättningar från CSV- och JSON-filer:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Du kan använda formatalternativ som stöds med Auto Loader. Med hjälp av map() funktionen kan du skicka alternativ till cloud_files() metoden. Alternativen är nyckel/värde-par, där nycklar och värden är strängar. Följande beskriver syntaxen för att arbeta med automatisk inläsning i SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

I följande exempel läss data från flikavgränsade CSV-filer med en rubrik:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Du kan använda schema för att ange formatet manuellt. Du måste ange schema för format som inte stöder schemainferens:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Kommentar

Delta Live Tables konfigurerar och hanterar automatiskt schema- och kontrollpunktskatalogerna när du använder Auto Loader för att läsa filer. Men om du konfigurerar någon av dessa kataloger manuellt påverkas inte innehållet i de konfigurerade katalogerna om du utför en fullständig uppdatering. Databricks rekommenderar att du använder de automatiskt konfigurerade katalogerna för att undvika oväntade biverkningar under bearbetningen.