일반적인 데이터 로딩 패턴

자동 로더는 여러 일반적인 데이터 수집 작업을 간소화합니다. 이 빠른 참조는 몇 가지 인기 있는 패턴에 대한 예제를 제공합니다.

GLOB 패턴을 사용하여 디렉터리 또는 파일 필터링

경로에 제공되는 경우 GLOB 패턴을 사용하여 디렉터리 및 파일을 필터링할 수 있습니다.

패턴 설명
? 임의의 단일 문자와 일치합니다.
* 0개 이상의 문자와 일치합니다.
[abc] 문자 집합 {a,b,c}의 단일 문자와 일치합니다.
[a-z] 문자 범위 {a…z}의 단일 문자와 일치합니다.
[^a] 문자 집합 또는 범위 {a}에 없는 단일 문자와 일치합니다. ^ 문자는 왼쪽 괄호 바로 오른쪽에 있어야 합니다.
{ab,cd} 문자열 집합 {ab, cd}의 문자열과 일치합니다.
{ab,c{de, fh}} 문자열 집합 {ab, cde, cfh}의 문자열과 일치합니다.

접두사 패턴을 제공하는 데 path를 사용합니다. 예를 들면 다음과 같습니다.

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Important

접미사 패턴을 명시적으로 제공하려면 pathGlobFilter 옵션을 사용해야 합니다. path는 접두사 필터만 제공합니다.

예를 들어, 접미사가 다른 파일이 포함된 디렉터리의 png 파일만 구문 분석하려면 다음을 수행할 수 있습니다.

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

참고 항목

자동 로더의 기본 globbing 동작은 다른 Spark 파일 원본의 기본 동작과 다릅니다. 파일 원본에 대한 기본 Spark 동작과 일치하는 글로빙을 사용하려면 읽기에 추가 .option("cloudFiles.useStrictGlobber", "true") 합니다. globbing에 대한 자세한 내용은 다음 표를 참조하세요.

패턴 파일 경로 기본 글로버 엄격한 글로버
/a/b /a/b/c/file.txt
/a/b /a/b_dir/c/file.txt 아니요 아니요
/a/b /a/b.txt 아니요 아니요
/a/b/ /a/b.txt 아니요 아니요
/a/*/c/ /a/b/c/file.txt
/a/*/c/ /a/b/c/d/file.txt
/a/*/c/ /a/b/x/y/c/file.txt 아니요
/a/*/c /a/b/c_file.txt 아니요
/a/*/c/ /a/b/c_file.txt 아니요
/a/*/c/ /a/*/cookie/file.txt 아니요
/a/b* /a/b.txt
/a/b* /a/b/file.txt
/a/{0.txt,1.txt} /a/0.txt
/a/*/{0.txt,1.txt} /a/0.txt 아니요 아니요
/a/b/[cde-h]/i/ /a/b/c/i/file.txt

쉬운 ETL 사용

데이터를 손실하지 않고 Delta Lake로 가져오는 쉬운 방법은 다음 패턴을 사용하고 자동 로더에서 스키마 유추를 사용하도록 설정하는 것입니다. 원본 데이터의 스키마가 변경될 때 스트림을 자동으로 다시 시작하기 위해 Databricks는 다음과 같은 Azure Databricks 작업 코드를 실행하도록 권장합니다. 기본적으로 스키마는 문자열 형식으로 유추되고, 모든 구문 분석 오류(모든 항목이 문자열로 남아 있는 경우 아무 것도 없어야 함)는 _rescued_data로 이동하며, 모든 새 열은 스트림에 실패하고 스키마를 진화합니다.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

효율적인 정형 데이터의 데이터 손실 방지

스키마를 알고 있지만 예기치 않은 데이터를 받을 때마다 알려고 하는 경우 Databricks는 rescuedDataColumn을 사용하도록 권장합니다.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

스키마와 일치하지 않는 새 필드가 도입되는 경우 스트림이 처리를 중지하도록 하려면 다음을 추가할 수 있습니다.

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

유연한 반정형 데이터 파이프라인 사용

새 열을 공급업체에서 제공하는 정보에 도입하는 데이터를 받을 때 공급업체에서 제공하는 시기를 정확히 알지 못하거나 데이터 파이프라인을 업데이트할 대역폭이 없을 수 있습니다. 이제 스키마 진화를 활용하여 스트림을 다시 시작하고 자동 로더에서 유추된 스키마를 자동으로 업데이트하도록 할 수 있습니다. 또한 공급업체에서 제공할 수 있는 "스키마 없는" 필드 중 일부에 대해 schemaHints를 활용할 수 있습니다.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

중첩된 JSON 데이터 변환

자동 로더는 최상위 JSON 열을 문자열로 유추하므로 추가 변환이 필요한 중첩된 JSON 개체가 남을 수 있습니다. 반구조화된 데이터 액세스 API를 사용하여 복잡한 JSON 콘텐츠를 추가로 변환할 수 있습니다.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

중첩된 JSON 데이터 추론

중첩된 데이터가 있는 경우 cloudFiles.inferColumnTypes 옵션을 사용하여 데이터 및 기타 열 형식의 중첩된 구조를 유추할 수 있습니다.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

헤더 없이 CSV 파일 로드

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

헤더를 사용하여 CSV 파일에 스키마 적용

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

ML용 Delta Lake에 이미지 또는 이진 데이터 수집

데이터가 Delta Lake에 저장되면 데이터에 대해 분산 유추를 실행할 수 있습니다. pandas UDF를 사용하여 분산 유추 수행을 참조하세요.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

DLT에 대한 자동 로더 구문

Delta Live Tables는 자동 로더에 대해 약간 수정된 Python 구문을 제공하여 자동 로더에 대한 SQL 지원을 추가합니다.

다음 예제에서는 자동 로더를 사용하여 CSV 및 JSON 파일에서 데이터 세트를 만듭니다.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

자동 로더에서 지원되는 형식 옵션을 사용할 수 있습니다. 함수를 map() 사용하여 메서드에 cloud_files() 옵션을 전달할 수 있습니다. 옵션은 키-값 쌍이며, 키와 값은 문자열입니다. 다음은 SQL에서 자동 로더를 사용하기 위한 구문에 대해 설명합니다.

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

다음 예제는 헤더가 포함된 탭으로 구분된 CSV 파일에서 데이터를 읽습니다.

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

schema를 사용하여 형식을 수동으로 지정할 수 있습니다. 스키마 유추를 지원하지 않는 형식의 경우 schema를 지정해야 합니다.

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

참고 항목

Delta Live Tables는 자동 로더를 사용하여 파일을 읽을 때 스키마와 검사점 디렉터리를 자동으로 구성하고 관리합니다. 그러나 이러한 디렉터리 중 하나를 수동으로 구성하는 경우 전체 새로 고침을 수행해도 구성된 디렉터리의 내용에 영향을 주지 않습니다. Databricks는 처리 중 예기치 않은 부작용을 피하기 위해 자동으로 구성된 디렉터리를 사용할 것을 권장합니다.