Gyakori adatbetöltési minták

Az Automatikus betöltő számos gyakori adatbetöltési feladatot egyszerűsít. Ez a rövid útmutató számos népszerű mintára mutat be példákat.

Könyvtárak vagy fájlok szűrése glob mintákkal

A Glob-minták a címtárak és fájlok szűréséhez használhatók, ha az elérési úton meg van adva.

Minta Leírás
? Egyetlen karakternek felel meg
* Nulla vagy több karakter egyezése
[abc] Egyetlen karakternek felel meg a(z) {a,b,c} karakterkészletből.
[a-z] Egyetlen karaktert egyezik a(z) {a... karaktertartományból z}.
[^a] Egyetlen karaktert egyezik meg, amely nem a(z) {a} karakterkészletből vagy tartományból származik. Vegye figyelembe, hogy a ^ karakternek közvetlenül a nyitó zárójel jobb oldalán kell lennie.
{ab,cd} Megfelel a(z) {ab, cd} sztringkészlet egyik sztringjének.
{ab,c{de, fh}} Megfelel a(z) {ab, cde, cfh} sztringhalmazból származó sztringnek.

Használja az path előtagmintákat, például:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Fontos

Az utótagminták explicit megadására szolgáló lehetőséget pathGlobFilter kell használnia. Az path egyetlen előtagszűrő.

Ha például csak png olyan fájlokat szeretne elemezni egy könyvtárban, amely különböző utótagokkal rendelkező fájlokat tartalmaz, a következőket teheti:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Megjegyzés:

Az automatikus betöltő alapértelmezett zúgási viselkedése eltér a többi Spark-fájlforrás alapértelmezett viselkedésétől. Adja hozzá .option("cloudFiles.useStrictGlobber", "true") az olvasáshoz az alapértelmezett Spark-viselkedésnek megfelelő globbingot a fájlforrásokhoz. A globbingról az alábbi táblázatban talál további információt:

Minta File path Alapértelmezett globber Szigorú globber
/a/b /a/b/c/file.txt Igen Igen
/a/b /a/b_dir/c/file.txt Nem Nem
/a/b /a/b.txt Nem Nem
/a/b/ /a/b.txt Nem Nem
/a/*/c/ /a/b/c/file.txt Igen Igen
/a/*/c/ /a/b/c/d/file.txt Igen Igen
/a/*/c/ /a/b/x/y/c/file.txt Igen Nem
/a/*/c /a/b/c_file.txt Igen Nem
/a/*/c/ /a/b/c_file.txt Igen Nem
/a/*/c/ /a/*/cookie/file.txt Igen Nem
/a/b* /a/b.txt Igen Igen
/a/b* /a/b/file.txt Igen Igen
/a/{0.txt,1.txt} /a/0.txt Igen Igen
/a/*/{0.txt,1.txt} /a/0.txt Nem Nem
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Igen Igen

Egyszerű ETL engedélyezése

Az adatok a Delta Lake-be való beolvasásának egyszerű módja az alábbi minta használata és a sémakövetkezés engedélyezése az Automatikus betöltővel. A Databricks azt javasolja, hogy futtassa az alábbi kódot egy Azure Databricks-feladatban, hogy automatikusan újraindítsa a streamet, amikor a forrásadatok sémája megváltozik. Alapértelmezés szerint a séma sztringtípusokként van kikövetkeztetve, minden elemzési hiba (ha minden sztringként marad) a rendszer el fog indulni _rescued_data, és minden új oszlop meghibásodik a streamben, és fejleszti a sémát.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Adatvesztés megakadályozása jól strukturált adatokban

Ha ismeri a sémát, de szeretné tudni, hogy mikor kap váratlan adatokat, a Databricks a .rescuedDataColumn

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Ha azt szeretné, hogy a stream leálljon a feldolgozással, ha olyan új mező van beállítva, amely nem felel meg a sémának, hozzáadhatja a következőt:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Rugalmas, félig strukturált adatfolyamok engedélyezése

Ha olyan szállítótól kap adatokat, amelyek új oszlopokat vezetnek be az általuk megadott információkhoz, előfordulhat, hogy nem tudja pontosan, hogy mikor teszik meg, vagy előfordulhat, hogy nem rendelkezik az adatfolyam frissítéséhez szükséges sávszélességgel. Most már használhatja a sémafejlődést a stream újraindításához, és engedélyezheti, hogy az Automatikus betöltő automatikusan frissítse a következtetett sémát. A szállító által megadott "séma nélküli" mezők némelyikét is használhatja schemaHints .

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Beágyazott JSON-adatok átalakítása

Mivel az Automatikus betöltő sztringként a legfelső szintű JSON-oszlopokat keresi, a további átalakításokat igénylő beágyazott JSON-objektumokkal is maradhat. A félig strukturált adatelérési API-k segítségével tovább alakíthatja az összetett JSON-tartalmakat.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Beágyazott JSON-adatok következtetése

Ha beágyazott adatokkal rendelkezik, azzal a cloudFiles.inferColumnTypes lehetőséggel következtethet az adatok és más oszloptípusok beágyazott szerkezetére.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

CSV-fájlok betöltése fejlécek nélkül

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Séma kényszerítése fejlécekkel rendelkező CSV-fájlokon

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Kép- vagy bináris adatok betöltése a Delta Lake for ML-be

Miután az adatokat a Delta Lake-ben tárolta, elosztott következtetést futtathat az adatokon. Lásd: Elosztott következtetés végrehajtása pandas UDF használatával.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

A DLT automatikus betöltő szintaxisa

A Delta Live Tables kissé módosított Python-szintaxist biztosít az Automatikus betöltőhöz, és sql-támogatást ad hozzá az automatikus betöltőhöz.

Az alábbi példák az Automatikus betöltő használatával hoznak létre adatkészleteket CSV- és JSON-fájlokból:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Az Automatikus betöltővel támogatott formátumbeállításokat használhat. A map() függvény használatával megadhat beállításokat a cloud_files() metódusnak. A beállítások kulcs-érték párok, ahol a kulcsok és az értékek sztringek. Az alábbiak az automatikus betöltő SQL-ben való használatának szintaxisát ismertetik:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Az alábbi példa tabulátorral tagolt CSV-fájlokból olvas be adatokat fejléccel:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

A formátumot manuálisan is megadhatjaschema. Meg kell adnia azokat a formátumokat, amelyek nem támogatják a schema sémakövetődést:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Megjegyzés:

A Delta Live Tables automatikusan konfigurálja és kezeli a sémát és az ellenőrzőpont-könyvtárakat az Automatikus betöltő használatával a fájlok olvasásához. Ha azonban manuálisan konfigurálja valamelyik könyvtárat, a teljes frissítés végrehajtása nem befolyásolja a konfigurált könyvtárak tartalmát. A Databricks az automatikusan konfigurált címtárak használatát javasolja a váratlan mellékhatások elkerülése érdekében a feldolgozás során.