自动加载程序中的架构参考和演化

注意

Databricks Runtime 8.2 及更高版本中提供对 JSON 格式的支持;Databricks Runtime 8.3 及更高版本中提供对 CSV 格式的支持。 有关每种格式的详细信息,请参阅数据格式

自动加载程序可以自动检测将新列引入数据的情况并重启,因此你无需自行管理架构更改的跟踪和处理。 当 JSON Blob 列中出现意外的数据(例如不同类型的数据,以后可以选择使用半结构化数据访问 API 访问这些数据)时,自动加载程序还可以提供“补救”措施。

架构推理

为了推理架构,自动加载程序会在它发现的前 50 GB 或前 1000 个文件(以先超过的限制为准)中采样。 为了避免每次启动流时都会产生此项推理成本,并能够在每次重启流后提供稳定的架构,必须设置选项 cloudFiles.schemaLocation。 自动加载程序会在此位置创建隐藏的目录 _schemas,以跟踪一段时间内输入数据发生的架构更改。 如果流包含单个要从中引入数据的 cloudFiles 源,则你可以将检查点位置作为 cloudFiles.schemaLocation 提供。 否则,请为此选项提供唯一的目录。 如果输入数据返回了流的意外架构,请检查架构位置是否仅由单个自动加载程序源使用。

注意

若要更改所用样本的大小,可以设置 SQL 配置:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(字节字符串,例如 10gb

以及

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(整数)

自动加载程序默认将采用 CSV 和 JSON 等基于文本的文件格式的列推断为 string 列。 在 JSON 数据集中,会将嵌套列也推断为 string 列。 由于 JSON 和 CSV 数据是自述性的并可支持许多数据类型,因此,将数据推理为字符串有助于避免数字类型不匹配(整数、长型、浮点型)等架构演变问题。 如果你要保留原始的 Spark 架构推理行为,请将选项 cloudFiles.inferColumnTypes 设置为 true

注意

除非启用了区分大小写,否则会将列 fooFooFOO 视为相同的列。 选择列的大小写表示形式是任意性的操作,具体取决于采样的数据。 可以通过架构提示来强制要求使用哪种大小写。

如果数据排布在 Hive 样式分区中,则自动加载程序还会尝试从数据的基础目录结构推理分区列。 例如,使用诸如 base_path/event=click/date=2021-04-01/f0.json 的文件路径会导致将 dateevent 推理为分区列。 除非将 cloudFiles.inferColumnTypes 设置为 true,否则这些列的数据类型将是字符串。 如果基础目录结构包含有冲突的 Hive 分区或者不包含 Hive 样式分区,则会忽略分区列。 可以逗号分隔的列名列表形式提供选项 cloudFiles.partitionColumns,以便始终尝试从文件路径分析给定的列(如果这些列以 key=value 对的形式存在于目录结构中)。

当自动加载程序推断架构时 ,已救援 的数据列会自动作为 添加到架构 中。 有关详细信息,请参阅有关补救数据列架构演变的部分。

注意

二进制文件 (binaryFile) 和 text 文件格式采用固定的数据架构,但也支持分区列推理。 除非指定 cloudFiles.schemaLocation,否则每次重启流都会推理分区列。 为了避免任何潜在的错误或信息丢失,Databricks 建议将 cloudFiles.schemaLocationcloudFiles.partitionColumns 设置为这些文件格式的选项,因为 cloudFiles.schemaLocation 不是这些格式的必需选项。

架构提示

推理的数据类型不一定总是完全符合你的预期。 通过使用架构提示,可以叠加你所知道的并期望出现在推理架构中的信息。

默认情况下,Apache Spark 会提供一种标准方法用于推理数据列的类型。 例如,它将嵌套的 JSON 推理为结构,将整数推理为长型数字。 相比之下,自动加载程序将所有列视为字符串。 如果你知道某个列采用特定的数据类型,或者想要选择更常规的数据类型(例如双精度型而不是整数),可按如下所示为列数据类型提供任意数量的提示:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

有关支持的数据类型列表,请参阅有关数据类型的文档。

如果启动流时不存在某个列,则你还可使用架构提示将该列添加到推理的架构中。

以下推理架构示例演示了使用架构提示时的行为。 推理的架构:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

指定以下架构提示:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

将会获得:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

注意

9.1 LTS 和 Databricks Runtime 9.1 LTS Databricks Runtime及以上版本提供数组和映射架构提示支持。

下面是具有复杂数据类型的推断架构的示例,用于查看具有架构提示的行为。 推理的架构:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

指定以下架构提示:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

将会获得:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

注意

仅当未向自动加载程序提供架构时,才会使用架构提示。 不管是启用还是禁用了 cloudFiles.inferColumnTypes,都可以使用架构提示。

架构演变

自动加载程序在处理数据时会检测是否添加了新列。 默认情况下,添加新列会导致流停止并出现 UnknownFieldException。 在流引发此错误之前,自动加载程序会针对最新的数据微批执行架构推理,并使用最新架构更新架构位置。 新列将合并到架构的末尾。 现有列的数据类型将保持不变。 在 Azure Databricks 作业中设置自动加载程序流可以在发生此类架构更改后让流自动重启。

自动加载程序支持以下架构演变模式,可以在选项 cloudFiles.schemaEvolutionMode 中设置这些模式:

  • addNewColumns:未向自动加载程序提供架构时使用的默认模式。 流作业将失败并出现 UnknownFieldException。 新列将添加到架构。 现有列不会使数据类型演变。 提供了流的架构时,不允许使用 addNewColumns。 如果要使用此模式,可改为以架构提示的形式提供架构。
  • failOnNewColumns:如果自动加载程序检测到新列,则流会失败。 除非更新提供的架构,或者删除有问题的数据文件,否则流不会重启。
  • rescue:流使用第一个推理的或提供的架构运行。 任何数据类型更改或添加的新列都会在被救援的数据列中获得救援,该列会自动添加到流的架构中,作为 。 在此模式下,流不会因架构更改而失败。
  • none:提供了架构时使用的默认模式。 不会使架构演变,将忽略新列,并且除非单独以选项的形式提供了补救数据列,否则不会补救数据。

不考虑对分区列进行架构演变。 如果有一个类似于 base_path/event=click/date=2021-04-01/f0.json 的初始目录结构,然后开始以 base_path/event=click/date=2021-04-01/hour=01/f1.json 的形式接收新文件,那么小时列将被忽略。 若要捕获新分区列的信息,请将 cloudFiles.partitionColumns 设置为 event,date,hour

补救数据列

补救数据列可确保在 ETL 期间不会丢失或错过数据。 补救数据列包含由于在给定架构中缺失、类型不匹配或者记录或文件中列的大小写与架构中不匹配而未分析的任何数据。 补救数据列以 JSON blob 形式返回,其中包含补救的列和记录的源文件路径(Databricks Runtime 8.3 及更高版本中支持源文件路径)。 补救数据列是推理架构时,由自动加载程序默认作为 _rescued_data 返回的架构的一部分。 可以重命名该列,或者在提供架构的情况下通过设置选项 rescuedDataColumn 来包含该列。

由于在推理架构时,cloudFiles.inferColumnTypes 的默认值为 falsecloudFiles.schemaEvolutionModeaddNewColumns,因此 rescuedDataColumn 仅捕获其大小写与架构中的大小写不同的列。

分析记录时,JSON 和 CSV 分析器支持三种模式:PERMISSIVEDROPMALFORMEDFAILFAST。 与 rescuedDataColumn 一起使用时,数据类型不匹配不会导致在 DROPMALFORMED 模式下删除记录,或者在 FAILFAST 模式下引发错误。 只有损坏的记录(即不完整或格式错误的 JSON 或 CSV)会被删除或引发错误。 如果在分析 JSON 或 CSV 时使用 badRecordsPath,则在使用 rescuedDataColumn 时,不会将数据类型不匹配情况视为错误的记录。 只有不完整的和格式错误的 JSON 或 CSV 记录会存储在 badRecordsPath 中。

数据格式

限制

  • 使用 foreachBatch 的 Databricks Runtime 8.2 和 8.3 上运行的 Python 应用程序不支持架构演变。 可以在 Scala 中使用 foreachBatch

示例用例:

启用简易 ETL

将数据引入 Delta Lake 而不丢失任何数据的一种简单方法是使用以下模式并在自动加载程序中启用架构推理。 Databricks 建议为此在某个 Azure Databricks 作业中运行以下代码,以便在源数据架构发生更改时自动重启流。 默认情况下,架构将推理为字符串类型,所有分析错误(如果所有内容保留为字符串,则不应会出现任何错误)将进入 _rescued_data,任何新列将导致流失败并使架构演变。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

防止结构良好的数据丢失

如果你知道自己的架构,但希望每次收到意外数据时知道这一情况,则 Databricks 建议使用 rescuedDataColumn

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

如果你希望在引入了与架构不匹配的新字段时流停止处理,可以添加:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

启用灵活的半结构化数据管道

当你从供应商收到了数据,而这些数据会将新列引入到他们提供的信息时,你可能不知道他们何时提供了信息,或者没有带宽来更新数据管道。 你现在可以利用架构演变来重启流,并让自动加载程序自动更新推理的架构。 对于供应商可能提供的一些“无架构”字段,还可以利用 schemaHints

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

常见问题 (FAQ)

自动加载程序如何推理架构?

首次定义数据帧时,自动加载程序会列出源目录,选择最近的 50 GB 或 1000 个文件(按文件修改时间),并使用这些文件来推理数据架构。

自动加载程序还通过检查源目录结构来推理分区列,并查找包含 /key=value/ 结构的文件路径。 例如,如果源目录采用了不一致的结构:

base/path/partition=1/date=2020-12-31/file1.json
// inconsistent because date and partition directories are in different orders
base/path/date=2020-12-31/partition=2/file2.json
// inconsistent because the date directory is missing
base/path/partition=3/file3.json

自动加载程序会将分区列推理为空。 使用 cloudFiles.partitionColumns 显式分析目录结构中的列。

当源文件夹为空时,自动加载程序的行为如何?

如果源目录为空,则自动加载程序会要求你提供架构,因为没有任何数据可用于执行推理。

自动加载程序何时推理架构? 它是否会在完成每个微批后自动演变?

首次在代码中定义数据帧时,将推理架构。 在处理每个微批期间,将即时评估架构更改;因此,你无需担心对性能的影响。 当流重启时,它会从架构位置选取已演变的架构并开始执行,而不会产生任何推理开销。

使用自动加载程序架构推理时,数据引入性能会受到哪种影响?

在执行初始架构推理期间,对于极大的源目录,架构推理预期会花费几分钟时间。 除此之外,在流执行期间,性能不应会受到较大的影响。 如果在 Azure Databricks 笔记本中运行代码,则可以看到状态更新,其中会指明自动加载程序何时列出目录用于采样和推理数据架构。

由于出现 bug,某个错误的文件极大地改变了我的架构。 如何回滚架构更改?

请联系 Databricks 支持人员取得帮助。