자동 로더의 스키마 유추 및 진화

참고

JSON 형식 지원은 Databricks Runtime 8.2 이상에서 사용할 수 있습니다. CSV 형식 지원은 Databricks Runtime 8.3 이상에서 사용할 수 있습니다. 각 형식에 대 한 자세한 내용은 데이터 형식을 참조 하세요.

자동 로더에서는 데이터에 대 한 새 열을 자동으로 검색 하 고 다시 시작 하 여 스키마 변경 내용 추적과 처리를 직접 관리할 필요가 없습니다. 자동 로더에서는 나중에 반 구조화 된 데이터 액세스 api를 사용 하 여 액세스 하도록 선택할 수 있는 JSON blob 열에서 예기치 않은 (예: 서로 다른 데이터 형식) 데이터를 "복구" 할 수도 있습니다.

스키마 유추

스키마를 유추 하기 위해 자동 로더는 검색 되는 첫 번째 50 GB 또는 1000 파일을 샘플링 합니다 .이는 먼저 한도에 도달 합니다. 모든 스트림 시작 시이 유추 비용이 발생 하지 않도록 하 고, 스트림 다시 시작 간에 안정적인 스키마를 제공할 수 있도록 하려면 옵션을 설정 해야 합니다 cloudFiles.schemaLocation . 자동 로더에서는 _schemas 시간에 따른 입력 데이터의 스키마 변경 내용을 추적 하기 위해이 위치에 숨겨진 디렉터리를 만듭니다. 스트림에 cloudFiles 데이터를 수집 하기 위한 단일 원본이 포함 되어 있으면 검사점 위치를로 제공할 수 있습니다 cloudFiles.schemaLocation . 그렇지 않으면이 옵션에 대 한 고유 디렉터리를 제공 합니다. 입력 데이터가 스트림에 대해 예기치 않은 스키마를 반환 하는 경우에는 단일 자동 로더 원본 에서만 스키마 위치를 사용 하 고 있는지 확인 합니다.

참고

사용 되는 샘플의 크기를 변경 하려면 SQL 구성을 설정할 수 있습니다.

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(예: 바이트 문자열 10gb )

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

기본적으로 자동 로더는 열로 CSV 및 JSON과 같은 텍스트 기반 파일 형식의 열을 유추 string 합니다. JSON 데이터 집합에서 중첩 열도 열로 유추 됩니다 string . JSON 및 CSV 데이터는 자체 설명적 이며 많은 데이터 형식을 지원할 수 있으므로 데이터를 문자열로 유추 하면 숫자 형식 불일치 (정수, long, 부동 소수점)와 같은 스키마 진화 문제를 방지 하는 데 도움이 될 수 있습니다. 원래 Spark 스키마 유추 동작을 유지 하려면 옵션을 cloudFiles.inferColumnTypes 로 설정 true 합니다.

참고

대/소문자 구분을 사용 하는 경우를 제외 하 고, 및 열은 fooFooFOO 같은 열로 간주 됩니다. 열이 표시 되는 경우는 임의로 선택 되며 샘플링 된 데이터에 따라 달라 집니다. 스키마 힌트 를 사용 하 여 사용 해야 하는 사례를 적용할 수 있습니다.

또한 데이터를 Hive 스타일 분할에 배치 하는 경우 자동 로더에서 데이터의 기본 디렉터리 구조에서 파티션 열을 유추 하려고 시도 합니다. 예를 들어와 같은 파일 경로는 base_path/event=click/date=2021-04-01/f0.json 및의 유추를 dateevent 파티션 열로 생성 합니다. 을 true로 설정 하지 않으면 이러한 열의 데이터 형식이 문자열이 됩니다 cloudFiles.inferColumnTypes . 기본 디렉터리 구조에 충돌 하는 Hive 파티션이 있거나 Hive 스타일 분할이 포함 되지 않은 경우 파티션 열은 무시 됩니다. 이 옵션을 cloudFiles.partitionColumns 쉼표로 구분 된 열 이름 목록으로 제공 하 여 이러한 열이 key=value 디렉터리 구조에서 쌍으로 존재 하는 경우 항상 파일 경로에서 지정 된 열을 시도 하 고 구문 분석할 수 있습니다.

자동 로더에서 스키마를 유추 하는 경우 자동 복구 데이터 열 이 자동으로 스키마에 추가 됩니다 . 자세한 내용은 자동 복구 데이터 열스키마 에 대 한 섹션을 참조 하세요.

참고

이진 파일 ( binaryFile ) 및 text 파일 형식은 고정 된 데이터 스키마를 갖지만 파티션 열 유추도 지원 합니다. 을 지정 하지 않으면 각 스트림 다시 시작 시 파티션 열이 유추 됩니다 cloudFiles.schemaLocation . 잠재적 오류나 정보 손실을 방지 하기 위해 Databricks cloudFiles.schemaLocationcloudFiles.partitionColumns 는 이러한 cloudFiles.schemaLocation 형식에 대 한 필수 옵션이 아니라 이러한 파일 형식에 대 한 옵션을 설정 하는 것이 좋습니다.

스키마 힌트

유추 되는 데이터 형식은 항상 정확히 일치 하지 않을 수 있습니다. 스키마 힌트를 사용 하면 유추 된 스키마에 대해 알고 있고 기대한 정보를 superimpose 수 있습니다.

기본적으로 Apache Spark에는 데이터 열의 형식을 유추 하는 표준 방법이 있습니다. 예를 들어 중첩 된 JSON을 구조체 및 정수로 long으로 유추 합니다. 반면에 자동 로더는 모든 열을 문자열로 간주 합니다. 열이 특정 데이터 형식 이거나 보다 일반적인 데이터 형식 (예: 정수 대신 double)을 선택 하려는 경우에는 다음과 같이 열 데이터 형식에 대해 임의의 개수의 힌트를 제공할 수 있습니다.

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

지원 되는 데이터 형식 목록은 데이터 형식 에 대 한 설명서를 참조 하세요.

스트림의 시작 부분에 열이 없는 경우 스키마 힌트를 사용 하 여 유추 된 스키마에 해당 열을 추가할 수도 있습니다.

다음은 스키마 힌트의 동작을 확인 하는 유추 된 스키마의 예입니다. 유추 된 스키마:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

다음 스키마 힌트를 지정 합니다.

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

다음을 가져옵니다.

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

참고

배열 및 맵 스키마 힌트 지원은 Databricks Runtime 9.1 lts 및 Databricks Runtime 9.1 Lts Photon 이상에서 사용할 수 있습니다.

다음은 스키마 힌트의 동작을 볼 수 있는 복잡 한 데이터 형식이 포함 된 유추 된 스키마의 예입니다. 유추 된 스키마:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

다음 스키마 힌트를 지정 합니다.

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

다음을 가져옵니다.

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

참고

스키마 힌트는 자동 로더에서 스키마를 제공 하지 않는 경우에만 사용 됩니다. 이 사용 되는지 여부에 관계 없이 스키마 힌트를 사용할 수 있습니다 cloudFiles.inferColumnTypes .

스키마 진화

자동 로더에서는 데이터를 처리할 때 새 열을 추가 하는 것을 감지 합니다. 기본적으로 새 열을 추가 하면 스트림이로 중지 됩니다 UnknownFieldException . 스트림이이 오류를 발생 하기 전에 자동 로더에서 최신 데이터의 마이크로 일괄 처리에 대 한 스키마 유추를 수행 하 고 스키마 위치를 최신 스키마로 업데이트 합니다. 새 열은 스키마의 끝 부분에 병합 됩니다. 기존 열의 데이터 형식은 변경 되지 않고 그대로 유지 됩니다. Azure Databricks 작업 내에서 자동 로더 스트림을 설정 하면 이러한 스키마가 변경 된 후 스트림을 자동으로 다시 시작할 수 있습니다.

자동 로더는 옵션에서 설정한 스키마 진화에 대해 다음과 같은 모드를 지원 합니다 cloudFiles.schemaEvolutionMode .

  • addNewColumns: 자동 로더에서 스키마를 제공 하지 않는 경우의 기본 모드입니다. 스트리밍 작업은를 사용 하 여 실패 합니다 UnknownFieldException . 새 열이 스키마에 추가 됩니다. 기존 열은 데이터 형식이 진화 하지 않습니다. addNewColumns 스트림의 스키마를 제공 하는 경우은 허용 되지 않습니다. 대신이 모드를 사용 하려는 경우 스키마를 스키마 힌트로 제공할 수 있습니다.
  • failOnNewColumns: 자동 로더에서 새 열을 검색 하는 경우 스트림이 실패 합니다. 제공 된 스키마가 업데이트 되지 않거나 잘못 된 데이터 파일이 제거 된 경우를 제외 하 고는 다시 시작 되지 않습니다.
  • rescue: 스트림은 가장 먼저 유추 되거나 제공 된 스키마를 사용 하 여 실행 됩니다. 데이터 형식이 변경 되거나 추가 되는 새 열은 자동으로 stream의 스키마에 자동으로 추가 되는 자동 복구 데이터 열 에 자동으로 추가 됩니다 . 이 모드에서 스트림은 스키마 변경으로 인해 실패 하지 않습니다.
  • none: 스키마를 제공 하는 기본 모드입니다. 는 스키마를 진화 하지 않으며, 자동 복구 데이터 열을 옵션으로 별도로 제공 하지 않으면 새 열은 무시 되 고 데이터는 자동으로 복구 되지 않습니다.

파티션 열은 스키마의 진화에 고려 되지 않습니다. 와 같은 초기 디렉터리 구조를 만든 base_path/event=click/date=2021-04-01/f0.json 다음 새 파일을로 받기 시작 하면 base_path/event=click/date=2021-04-01/hour=01/f1.json 시간 열은 무시 됩니다. 새 파티션 열에 대 한 정보를 캡처하려면를 cloudFiles.partitionColumns 로 설정 event,date,hour 합니다.

자동 복구 데이터 열

자동 복구 데이터 열을 사용 하면 ETL 중에 데이터를 손실 하거나 놓치지 않습니다. 자동 복구 데이터 열에는 구문 분석 되지 않은 데이터가 포함 되어 있습니다 .이 데이터는 지정 된 스키마에서 누락 되었거나, 형식이 일치 하지 않거나, 레코드나 파일에 있는 열의 대/소문자가 스키마의 대/소문자와 일치 하지 않기 때문입니다. 자동 복구 데이터 열은 자동 복구 인 열을 포함 하는 JSON blob으로 반환 되 고 레코드의 원본 파일 경로 (Databricks Runtime 8.3 이상에서 원본 파일 경로를 사용할 수 있음)입니다. 자동 복구 데이터 열은 _rescued_data 스키마를 유추할 때 기본적으로 자동 로더에서 반환 하는 스키마의 일부입니다. 옵션을 설정 하 여 스키마를 제공 하는 경우 열 이름을 바꾸거나 열 이름을 포함할 수 있습니다 rescuedDataColumn .

의 기본값은 cloudFiles.inferColumnTypesfalsecloudFiles.schemaEvolutionModeaddNewColumns 스키마를 유추할 때는 이며은 rescuedDataColumn 스키마의 대/소문자가 다른 열만 캡처합니다.

JSON 및 CSV 파서는 PERMISSIVE , 및 레코드를 구문 분석할 때 세 가지 모드를 지원 합니다 DROPMALFORMEDFAILFAST . 와 함께 사용 하는 경우 rescuedDataColumn 데이터 형식 불일치는 레코드를 모드에서 삭제 DROPMALFORMED 하거나 모드에서 오류를 throw 하지 않습니다 FAILFAST . 불완전 하거나 JSON 또는 CSV 형식의 손상 된 레코드만 삭제 되거나 오류를 throw 합니다. badRecordsPathJSON 또는 CSV를 구문 분석할 때를 사용 하는 경우 데이터 형식 불일치는를 사용 하는 경우 잘못 된 레코드로 간주 되지 않습니다 rescuedDataColumn . 불완전 하 고 형식이 잘못 된 JSON 또는 CSV 레코드만에 저장 됩니다 badRecordsPath .

데이터 형식

제한 사항

  • 를 사용하는 Databricks Runtime 8.2 및 8.3에서 실행되는 Python 애플리케이션에서는 스키마 진화가 지원되지 foreachBatch 않습니다. Scala에서 를 대신 사용할 수 foreachBatch 있습니다.

예제 사용 사례

쉬운 ETL 사용

데이터를 손실하지 않고 데이터를 Delta Lake로 쉽게 가져오기 위한 방법은 다음 패턴을 사용하고 자동 로더를 사용하여 스키마 유추를 사용하도록 설정하는 것입니다. Databricks는 원본 데이터의 스키마가 변경되면 스트림을 자동으로 다시 시작하도록 Azure Databricks 작업에서 다음 코드를 실행하는 것이 좋습니다. 기본적으로 스키마는 문자열 형식으로 유추되고, 구문 분석 오류(모든 항목이 문자열로 남아 있으면 아무 것도 없음)가 로 _rescued_data 이동하며, 새 열이 스트림에 실패하고 스키마가 진화합니다.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

잘 구성된 데이터의 데이터 손실 방지

스키마를 알고 있지만 예기치 않은 데이터를 받을 때마다 알고 싶은 경우 Databricks는 를 사용하는 것이 rescuedDataColumn 좋습니다.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

스키마와 일치하지 않는 새 필드가 도입된 경우 스트림이 처리를 중지하도록 하려면 다음을 추가할 수 있습니다.

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

유연한 반구조적 데이터 파이프라인 사용

공급 업체로부터 제공하는 정보에 새 열을 도입하는 데이터를 수신하는 경우 해당 열이 언제 제공되는지 정확히 알지 못하거나 데이터 파이프라인을 업데이트할 대역폭이 없을 수 있습니다. 이제 스키마 진화를 활용하여 스트림을 다시 시작하고 자동 로더가 유추된 스키마를 자동으로 업데이트하도록 할 수 있습니다. 공급업체에서 제공할 수 있는 "스키마 없는" 필드 중 일부에 대해서도 활용할 schemaHints 수 있습니다.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

FAQ(질문과 대답)

자동 로더는 스키마를 어떻게 유추하나요?

DataFrame이 처음 정의되면 자동 로더는 원본 디렉터리를 나열하고 가장 최근(파일 수정 시간별) 50GB 또는 1000개의 파일을 선택하고 해당 파일을 사용하여 데이터 스키마를 유추합니다.

또한 자동 로더는 원본 디렉터리 구조를 검사하여 파티션 열을 유추하고 구조가 포함된 파일 경로를 /key=value/ 찾습니다. 원본 디렉터리에 일관되지 않은 구조가 있는 경우( 예:

base/path/partition=1/date=2020-12-31/file1.json
// inconsistent because date and partition directories are in different orders
base/path/date=2020-12-31/partition=2/file2.json
// inconsistent because the date directory is missing
base/path/partition=3/file3.json

자동 로더는 파티션 열을 빈 것으로 유추합니다. 를 사용하여 cloudFiles.partitionColumns 디렉터리 구조에서 열을 명시적으로 구문 분석합니다.

원본 폴더가 비어 있으면 자동 로더가 어떻게 작동하나요?

원본 디렉터리를 비워 두면 자동 로더에서 유추를 수행할 데이터가 없으면 스키마를 제공해야 합니다.

Autoloader는 언제 스키마를 유추하나요? 모든 마이크로 일괄 처리 후에 자동으로 진화하나요?

스키마는 DataFrame이 코드 내에서 처음 정의될 때 유추됩니다. 각 마이크로 일괄 처리 중에는 스키마 변경 내용이 즉석에서 평가됩니다. 따라서 성능 적중에 대해 걱정할 필요가 없습니다. 스트림이 다시 시작되면 스키마 위치에서 진화된 스키마를 선택하고 유추 오버헤드 없이 실행을 시작합니다.

자동 로더 스키마 유추를 사용할 때 데이터 수집에 미치는 성능 영향은 무엇인가요?

초기 스키마 유추 중에 매우 큰 원본 디렉터리에 대해 스키마 유추에 몇 분 정도 걸릴 수 있습니다. 그렇지 않으면 스트림 실행 중에 상당한 성능 적중을 관찰하면 안 됩니다. Azure Databricks Notebook에서 코드를 실행하는 경우 자동 로더가 데이터 스키마 샘플링 및 유추를 위해 디렉터리를 나열할 시기를 지정하는 상태 업데이트를 볼 수 있습니다.

버그로 인해 잘못된 파일이 스키마를 크게 변경했습니다. 스키마 변경 내용을 롤백하려면 어떻게 해야 합니까?

도움이 될 수 있도록 Databricks 지원에 문의하십시오.