表批量读取和写入

Delta Lake 支持 Apache Spark 数据帧读取和写入 API 提供的大部分选项,这些选项可用于对表执行批量读取和写入操作。

若要了解 Delta Lake SQL 命令,请参阅

创建表

Delta Lake 支持创建两种类型的表:元存储中定义的表和由路径定义的表。

可以通过以下方式创建表。

  • SQL DDL命令:可以使用 SQL 中支持的标准 DDL Apache Spark (,) 创建 Delta REPLACE TABLE 表。

    CREATE TABLE IF NOT EXISTS default.people10m (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    ) USING DELTA
    
    CREATE OR REPLACE TABLE default.people10m (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    ) USING DELTA
    

    注意

    在 Databricks Runtime 8.0 及更高版本中,Delta Lake 是默认格式,无需使用 USING DELTA

    在 Databricks Runtime 7.0 及更高版本中,SQL 还支持在路径上创建表,而无需在 Hive 元存储中创建条目。

    -- Create or replace table with path
    CREATE OR REPLACE TABLE delta.`/tmp/delta/people10m` (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    ) USING DELTA
    
  • API:如果想要创建表,同时将 Spark DataFrame 或数据集中的数据插入到该表中,则可以使用 Spark DataFrameWriterDataFrameWriter 以及 Python)。

    Python

    # Create table in the metastore using DataFrame's schema and write data to it
    df.write.format("delta").saveAsTable("default.people10m")
    
    # Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to it
    df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
    

    Scala

    // Create table in the metastore using DataFrame's schema and write data to it
    df.write.format("delta").saveAsTable("default.people10m")
    
    // Create table with path using DataFrame's schema and write data to it
    df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
    
    • 在 Databricks Runtime 8.0 及更高版本中,Delta Lake 是默认格式,无需指定 USING DELTAformat("delta")using("delta")
    • 在 Databricks Runtime 7.0 及更高版本中,也可以使用 Spark DataFrameWriterV2 API 创建 Delta 表。
  • API:也可以使用 Delta Lake 中的 DeltaTableBuilder API 创建表。 与 DataFrameWriter API 相比,此 API 可以更轻松地指定其他信息,例如列注释、表属性和生成的列

    重要

    此功能目前以公共预览版提供。

    注意

    此功能在 Databricks Runtime 8.3 及更高版本上可用。

    Python

    # Create table in the metastore
    DeltaTable.createIfNotExists(spark) \
      .tableName("default.people10m") \
      .addColumn("id", "INT") \
      .addColumn("firstName", "STRING") \
      .addColumn("middleName", "STRING") \
      .addColumn("lastName", "STRING", comment = "surname") \
      .addColumn("gender", "STRING") \
      .addColumn("birthDate", "TIMESTAMP") \
      .addColumn("ssn", "STRING") \
      .addColumn("salary", "INT") \
      .execute()
    
    # Create or replace table with path and add properties
    DeltaTable.createOrReplace(spark) \
      .addColumn("id", "INT") \
      .addColumn("firstName", "STRING") \
      .addColumn("middleName", "STRING") \
      .addColumn("lastName", "STRING", comment = "surname") \
      .addColumn("gender", "STRING") \
      .addColumn("birthDate", "TIMESTAMP") \
      .addColumn("ssn", "STRING") \
      .addColumn("salary", "INT") \
      .property("description", "table with people data") \
      .location("/tmp/delta/people10m") \
      .execute()
    

    Scala

    // Create table in the metastore
    DeltaTable.createOrReplace(spark)
      .tableName("default.people10m")
      .addColumn("id", "INT")
      .addColumn("firstName", "STRING")
      .addColumn("middleName", "STRING")
      .addColumn(
        DeltaTable.columnBuilder("lastName")
          .dataType("STRING")
          .comment("surname")
          .build())
      .addColumn("lastName", "STRING", comment = "surname")
      .addColumn("gender", "STRING")
      .addColumn("birthDate", "TIMESTAMP")
      .addColumn("ssn", "STRING")
      .addColumn("salary", "INT")
      .execute()
    
    // Create or replace table with path and add properties
    DeltaTable.createOrReplace(spark)
      .addColumn("id", "INT")
      .addColumn("firstName", "STRING")
      .addColumn("middleName", "STRING")
      .addColumn(
        DeltaTable.columnBuilder("lastName")
          .dataType("STRING")
          .comment("surname")
          .build())
      .addColumn("lastName", "STRING", comment = "surname")
      .addColumn("gender", "STRING")
      .addColumn("birthDate", "TIMESTAMP")
      .addColumn("ssn", "STRING")
      .addColumn("salary", "INT")
      .property("description", "table with people data")
      .location("/tmp/delta/people10m")
      .execute()
    

有关详细信息 ,请参阅 API 文档。

另请参阅 创建表

将数据分区

你可以对数据进行分区,以加速其谓词涉及分区列的查询或 DML。 若要创建 Delta 表时对数据进行分区,请按列指定分区。 以下示例按性别进行分区。

SQL

-- Create table in the metastore
CREATE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)
USING DELTA
PARTITIONED BY (gender)

Python

df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")

DeltaTable.create(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .partitionedBy("gender") \
  .execute()

Scala

df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")

DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .partitionedBy("gender")
  .execute()

控制数据位置

对于元存储中定义的表,可以选择性地将 LOCATION 指定为路径。 使用指定的 LOCATION 创建的表被视为不受元存储管理。 与不指定路径的托管表不同,非托管表的文件在你 DROP 表时不会被删除。

使用已包含使用 Delta Lake 存储的数据的 运行时 CREATE TABLELOCATION ,Delta Lake 将执行以下操作: CREATE TABLE

  • 如果仅指定了表名称和位置,例如:

    CREATE TABLE default.people10m
    USING DELTA
    LOCATION '/tmp/delta/people10m'
    

    元存储中的表会自动继承现有数据的架构、分区和表属性。 此功能可用于将数据“导入”到元存储中。

  • 如果你指定了任何配置(架构、分区或表属性),则 Delta Lake 会验证指定的内容是否与现有数据的配置完全匹配。

    重要

    如果指定的配置与数据的配置并非完全匹配,则 Delta Lake 会引发一个描述差异的异常。

注意

元存储不是 Delta 表最新信息的真实来源。 事实上,元数据中的表定义可能没有包含所有元数据,如架构和属性。 它包含表位置,而该位置的表的事务日志就是真实来源。 如果从不知道此特定于 Delta 的自定义的系统查询元存储,可能会看到不完整或过时的表信息。

使用生成的列

重要

此功能目前以公共预览版提供。

注意

此功能在 Databricks Runtime 8.3 及更高版本上可用。

Delta Lake 支持生成的列,这些列是一种特殊类型的列,其值基于用户指定的函数在 Delta 表中的其他列上自动生成。 当写入具有生成的列的表且未显式为其提供值时,Delta Lake 会自动计算这些值。 例如,可以从时间戳列自动生成日期列(用于按日期对表进行分区);对表进行的任何写入都只需为时间戳列指定数据。 但是,如果显式提供这些值的值,则这些值必须满足约束 ,否则写入将失败并出现错误。

下面的示例介绍如何使用生成的列创建表:

SQL

CREATE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  dateOfBirth DATE GENERATED ALWAYS AS (CAST(birthDate AS DATE)),
  ssn STRING,
  salary INT
)
USING DELTA
PARTITIONED BY (gender)

Python

DeltaTable.create(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .partitionedBy("gender") \
  .execute()

Scala

DeltaTable.create(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn(
    DeltaTable.columnBuilder("dateOfBirth")
     .dataType(DateType)
     .generatedAlwaysAs("CAST(dateOfBirth AS DATE)")
     .build())
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .partitionedBy("gender")
  .execute()

生成的列像普通列一样存储。 也就是说,它们会占用存储空间。

以下限制适用于生成的列:

  • 生成表达式可以使用 Spark 中的任何 SQL 函数,这些函数在给定相同的参数值时始终返回相同的结果,但以下类型的函数除外:

    • 用户定义的函数。
    • 聚合函数。
    • 窗口函数。
    • 返回多行的函数。
  • 对于 Databricks Runtime 9.1 及以上,当设置为 true 时,操作 MERGE 支持生成的 spark.databricks.delta.schema.autoMerge.enabled 列。

读取表

可以通过指定一个表名或路径将 Delta 表作为数据帧加载:

SQL

SELECT * FROM default.people10m   -- query table in the metastore

SELECT * FROM delta.`/tmp/delta/people10m`  -- query table by path

Python

spark.table("default.people10m")    # query table in the metastore

spark.read.format("delta").load("/tmp/delta/people10m")  # query table by path

Scala

spark.table("default.people10m")      // query table in the metastore

spark.read.format("delta").load("/tmp/delta/people10m")  // create table by path

import io.delta.implicits._
spark.read.delta("/tmp/delta/people10m")

返回的数据帧会自动读取表的最新快照来进行任何查询;你永远不需要运行 REFRESH TABLE。 如果查询中存在适用的谓词,则 Delta Lake 会自动使用分区和统计信息来读取最少量的数据。

查询表的旧快照(按时间顺序查看)

本节内容:

Delta Lake 按时间顺序查看允许你查询 Delta 表的旧快照。 按时间顺序查看有许多用例,包括:

  • 重新创建分析、报表或输出(例如,机器学习模型的输出)。 这对于调试或审核非常有用,尤其是在管控行业中。
  • 编写复杂的时态查询。
  • 修复数据中的错误。
  • 为针对快速变化表的一组查询提供快照隔离。

本部分介绍了支持用于查询旧版表的方法、数据保留考虑事项,并提供了示例。

语法

本部分介绍了如何查询 Delta 表的较旧版本。

本节内容:

SQL AS OF 语法

SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version

其中

  • timestamp_expression 可以是下列项中的任意一项:
    • '2018-10-18T22:15:12.013Z',即可以强制转换为时间戳的字符串
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18',即日期字符串
    • 在 Databricks Runtime 6.6 及更高版本中:
      • current_timestamp() - interval 12 hours
      • date_sub(current_date(), 1)
      • 本身就是时间戳或可强制转换为时间戳的任何其他表达式
  • version 是可以从 DESCRIBE HISTORY table_spec 的输出中获取的 long 值。

timestamp_expressionversion 都不能是子查询。

示例

SELECT * FROM default.people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

DataFrameReader 选项

使用 DataFrameReader 选项,可以从固定到表的特定版本的 Delta 表创建数据帧。

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people10m")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people10m")

对于 timestamp_string,只接受日期或时间戳字符串。 例如,"2019-01-01""2019-01-01T00:00:00.000Z"

常见的模式是在执行 Azure Databricks 作业的整个过程中使用 Delta 表的最新状态来更新下游应用程序。

由于 Delta 表会自动更新,因此,如果基础数据进行了更新,则在进行多次调用时,从 Delta 表加载的数据帧可能会返回不同的结果。 通过使用按时间顺序查看,你可以修复多次调用时数据帧返回的数据:

latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/tmp/delta/people10m`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta/people10m")

示例

  • 为用户 111 修复对表的意外删除问题:
INSERT INTO my_table
  SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
  WHERE userId = 111
  • 修复对表的意外错误更新:
MERGE INTO my_table target
  USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
  ON source.userId = target.userId
  WHEN MATCHED THEN UPDATE SET *
  • 查询在过去一周内增加的新客户的数量。
SELECT count(distinct userId) - (
  SELECT count(distinct userId)
  FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))

数据保留

若要按时间顺序查看以前的某个版本,必须同时保留该版本的日志文件和数据文件。

永远不会自动删除支持 Delta 表的数据文件;仅在运行 VACUUM 时才会删除数据文件。 VACUUMVACUUM Delta 日志文件;写入检查点后,会自动清理日志文件。

默认情况下,可以按时间顺序查看最多 30 天内的 Delta 表,除非满足以下条件:

  • 在 Delta 表上运行 VACUUM
  • 使用以下表属性更改了数据或日志文件保留期:
    • delta.logRetentionDuration = "interval <interval>":控制表的历史记录的保留时间长度。 默认为 interval 30 days

      每次写入检查点时,Azure Databricks 会自动清除早于保留间隔的日志条目。 如果将此配置设置为足够大的值,则会保留许多日志条目。 这应当不会影响性能,因为针对日志的操作时间恒定。 针对历史记录的操作是并行的,但会随着日志大小的增加而变得更为昂贵。

    • delta.deletedFileRetentionDuration = "interval <interval>":控制文件在成为 的候选项delta.deletedFileRetentionDuration = "interval <interval>"VACUUM 。 默认为 interval 7 days

      若要访问 30 天内的历史数据,即使在 Delta 表上运行 VACUUM,也要设置 delta.deletedFileRetentionDuration = "interval 30 days"。 此设置可能会导致你的存储成本增加。

写入到表

追加

若要以原子方式将新数据添加到现有 Delta 表,请使用 append 模式:

SQL

INSERT INTO default.people10m SELECT * FROM morePeople

Python

df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")

Scala

df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("append").delta("/tmp/delta/people10m")

Overwrite

要以原子方式替换表中的所有数据,请使用 overwrite 模式:

SQL

INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople

Python

df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")

Scala

df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("overwrite").delta("/tmp/delta/people10m")

使用 DataFrames,还可以选择性地仅覆盖与任意表达式匹配的数据。 此功能在 Databricks Runtime 9.1 LTS 及以上版本提供。 以下命令以原子方式将目标表中 1 月的事件(由 分区)替换为 start_date 中的数据 df

Python

df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \
  .save("/tmp/delta/events")

Scala

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

此示例代码写出 中的数据 df ,验证这些数据是否都与谓词匹配,并执行原子替换。 如果要写出与谓词不匹配的数据,若要替换目标表中的匹配行,可以通过将 设置为 false 来禁用 spark.databricks.delta.replaceWhere.constraintCheck.enabled 约束检查:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

在 Databricks Runtime 9.0 及以下的中, replaceWhere 仅覆盖与分区列上的谓词匹配的数据。 以下命令以原子方式将目标表中 1 月的 月份(由 分区)替换为 date 中的数据 df

Python

df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
  .save("/tmp/delta/people10m")

Scala

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

在 Databricks Runtime 9.1 及以上,如果要回退到旧行为,可以禁用 spark.databricks.delta.replaceWhere.dataColumns.enabled 标志:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

注意

与 Apache Spark 中的文件 API 不同,Delta Lake 会记住并强制实施表的架构。 这意味着,默认情况下,覆盖不会替换现有表的架构。

有关 Delta Lake 在更新表方面的支持,请参阅表删除、更新和合并

设置用户定义的提交元数据

可以使用 DataFrameWriter 选项 userMetadata 或 SparkSession 配置 spark.databricks.delta.commitInfo.userMetadata,将用户定义的字符串指定为这些操作所进行的提交中的元数据。 如果同时指定了两个参数,则此选项将优先。 此用户定义的元数据在历史记录操作中是可读的。

SQL


SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE default.people10m SELECT * FROM morePeople

Python

df.write.format("delta") \
  .mode("overwrite") \
  .option("userMetadata", "overwritten-for-fixing-incorrect-data") \
  .save("/tmp/delta/people10m")

Scala

df.write.format("delta")
  .mode("overwrite")
  .option("userMetadata", "overwritten-for-fixing-incorrect-data")
  .save("/tmp/delta/people10m")

架构验证

Delta Lake 会自动验证正在写入的数据帧的架构是否与表的架构兼容。 Delta Lake 使用以下规则来确定从数据帧到表的写入是否兼容:

  • 所有数据帧列都必须存在于目标表中。 如果表中不存在 DataFrame 中的列,则引发异常。 表中存在但数据帧中不存在的列将设置为 NULL。
  • 数据帧列数据类型必须与目标表中的列数据类型匹配。 如果它们不匹配,则会引发异常。
  • 数据帧列名称不能仅通过大小写来区分。 这意味着不能在同一表中定义"Foo"和"foo"等列。 尽管可以在区分大小写或不区分大小写(默认)模式下使用 Spark,但 Parquet 在存储和返回列信息时区分大小写。 在存储架构时,Delta Lake 保留但不区分大小写,并采用此限制来避免潜在的错误、数据损坏或丢失问题。

Delta Lake 支持使用 DDL 显式添加新列并自动更新架构。

如果你指定其他选项(例如 partitionBy)与追加模式结合使用,则 Delta Lake 会验证它们是否匹配,在不匹配时会引发错误。 未提供 partitionBy 时,会在对现有数据分区之后自动进行追加。

注意

在 Databricks Runtime 7.0 及更高版本中,INSERT 语法提供了架构强制实施,并支持架构演变。 如果列的数据类型不能安全地强制转换为 Delta Lake 表的数据类型,则会引发运行时异常。 如果启用了架构演变,则新列可以作为架构的最后一列(或嵌套列)存在,以便架构得以演变。

更新表架构

Delta Lake 允许你更新表的架构。 支持下列类型的更改:

  • 添加新列(在任意位置)
  • 重新排列现有列
  • 重命名现有列

你可以使用 DDL 显式地或使用 DML 隐式地进行这些更改。

重要

更新 Delta 表架构时,从该表进行读取的流会终止。 如果你希望流继续进行,必须重启它。

有关建议的方法,请参阅生产中的结构化流式处理

显式更新架构

你可以使用以下 DDL 显式更改表的架构。

添加列

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

默认情况下,为 Null 性为 true

若要将列添加到嵌套字段,请使用:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
示例

如果运行 ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) 之前的架构为:

- root
| - colA
| - colB
| +-field1
| +-field2

则运行之后的架构为:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

注意

仅支持为结构添加嵌套列。 不支持数组和映射。

更改列注释或排序

ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

若要更改嵌套字段中的列,请使用:

ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
示例

如果运行 ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST 之前的架构为:

- root
| - colA
| - colB
| +-field1
| +-field2

则运行之后的架构为:

- root
| - colA
| - colB
| +-field2
| +-field1

替换列

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
示例

运行以下 DSL 时:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

如果运行之前的架构为:

- root
| - colA
| - colB
| +-field1
| +-field2

则运行之后的架构为:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

重命名列

重要

此功能目前以公共预览版提供。

要求
  • Databricks Runtime 10.2 或以上。
  • 若要重命名列而不重写列的任何现有数据,必须为表启用列映射。 请参阅 增量列映射

重命名列:

ALTER TABLE <table_name> RENAME COLUMN old_col_name TO new_col_name

重命名嵌套字段:

ALTER TABLE <table_name> RENAME COLUMN col_name.old_nested_field TO new_nested_field
示例

运行以下命令时:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

如果之前架构为:

- root
| - colA
| - colB
| +-field1
| +-field2

然后, 后的架构为:

- root
| - colA
| - colB
| +-field001
| +-field2

请参阅 增量列映射

更改列类型或名称

可以通过重写表来更改列的类型或名称或删除列。 为此,请使用 overwriteSchema 选项:

更改列类型
spark.read.table(...) \
  .withColumn("birthDate", col("birthDate").cast("date")) \
  .write \
  .format("delta") \
  .mode("overwrite")
  .option("overwriteSchema", "true") \
  .saveAsTable(...)
更改列名称
spark.read.table(...) \
  .withColumnRenamed("dateOfBirth", "birthDate") \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable(...)

自动架构更新

Delta Lake 可以在 DML 事务(追加或覆盖)中自动更新表的架构,并使架构与要写入的数据兼容。

添加列

在以下情况下,会自动将数据帧中存在但表中缺少的列添加为写入事务的一部分:

  • writewriteStream 具有 .option("mergeSchema", "true")
  • spark.databricks.delta.schema.autoMerge.enabledtrue

如果同时指定了这两个选项,则优先使用 DataFrameWriter 中的选项。 添加的列将追加到它们所在结构的末尾。 追加新列时,会保留大小写。

注意

  • mergeSchemamergeSchema 时, (,因为它将要求的请求提升为需要 MODIFYALL PRIVILEGES) 。
  • mergeSchema 不能与 或 INSERT INTO 一起使用 .write.insertInto()

NullType

由于 Parquet 不支持 NullType,因此在写入到 Delta 表时会从数据帧中删除 NullType 列,但仍会将其存储在架构中。 如果为该列接收到不同的数据类型,则 Delta Lake 会将该架构合并到新的数据类型。 如果 Delta Lake 接收到现有列的 NullType,则在写入过程中会保留旧架构并删除新列。

不支持流式处理中的 NullType。 由于在使用流式处理时必须设置架构,因此这应该非常罕见。 对于复杂类型(例如 ArrayTypeMapType),也不会接受 NullType

替换表架构

默认情况下,覆盖表中的数据不会覆盖架构。 在不使用 replaceWhere 的情况下使用 mode("overwrite") 来覆盖表时,你可能还希望覆盖写入的数据的架构。 你可以通过将 overwriteSchema 选项设置为 true 来替换表的架构和分区:

df.write.option("overwriteSchema", "true")

表中的视图

Delta Lake 支持基于 Delta 表创建视图,就像使用数据源表一样。

这些视图集成了表访问控制,可以实现列级和行级安全性。

处理视图时的主要难题是解析架构。 如果你更改 Delta 表架构,则必须重新创建派生视图,以容纳向该架构添加的任何内容。 例如,如果向 Delta 表中添加一个新列,则必须确保此列在基于该基表构建的相应视图中可用。

表属性

你可以使用 CREATEALTER 中的 TBLPROPERTIES 将自己的元数据存储为表属性。

TBLPROPERTIES 存储为 Delta 表元数据的一部分。 如果在给定位置已存在 Delta 表,则无法在 CREATE 语句中定义新的 TBLPROPERTIES。 有关更多详细信息,请参阅创建表

此外,为了调整行为和性能,Delta Lake 支持某些 Delta 表属性:

  • 阻止 Delta 表中的删除和更新:delta.appendOnly=true

  • 配置 时间行程保留 属性: delta.deletedFileRetentionDuration=<interval-string> 。 有关详细信息,请参阅数据保留

  • 配置要为其收集统计信息的列的数目:delta.dataSkippingNumIndexedCols=<number-of-columns>。 此属性仅对写出的新数据有效。

注意

  • 修改 Delta 表属性是一个写入操作,该操作会与其他并发写入操作冲突,导致这些操作失败。 建议仅当不存在对表的并发写入操作时才修改表属性。

你还可以在第一次提交到 Delta 表期间使用 Spark 配置来设置带 delta. 前缀的属性。 例如,若要使用属性 delta.appendOnly=true 初始化 Delta 表,请将 Spark 配置 spark.databricks.delta.properties.defaults.appendOnly 设置为 true。 例如: 。

SQL

spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")

Python

spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

Scala

spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

表元数据

Delta Lake 提供了丰富的用来浏览表元数据的功能。

它支持 SHOW [PARTITIONS | COLUMNS]DESCRIBE TABLE。 请参阅

它还提供了以下独特的命令:

DESCRIBE DETAIL

提供架构、分区、表大小等方面的信息。 有关详细信息,请参阅检索 Delta 表详细信息

DESCRIBE HISTORY

提供出处信息,包括操作、用户等,以及向表的每次写入的操作指标。 表历史记录会保留 30 天。 有关详细信息,请参阅检索 Delta 表历史记录

数据边栏提供此详细的表信息的可视化视图和 Delta 表的历史记录。 除了表架构和示例数据之外,您还可以单击 " 历史记录 " 选项卡以查看显示的表历史记录

配置存储凭据

Delta Lake 使用 Hadoop 文件系统 Api 来访问存储系统。 通常可以通过 Hadoop 配置来设置存储系统的凭据。 增量 Lake 提供多种方式来设置 Hadoop 配置,类似于 Apache Spark。

Spark 配置

启动群集上的 Spark 应用程序时,可以使用的形式设置 Spark 配置 spark.hadoop.* 以传递自定义 Hadoop 配置。 例如,设置的值将以 spark.hadoop.a.b.c Hadoop 配置的形式传递该值 a.b.c ,而增量 Lake 将使用它来访问 Hadoop 文件系统 api。

有关更多详细信息,请参阅 __

SQL 会话配置

Spark SQL 会将当前 SQL 的所有会话配置传递到增量 lake,而增量 lake 将使用它们来访问 Hadoop 文件系统 api。 例如, SET a.b.c=x.y.z 将告知增量 lake 将值 x.y.z 作为 Hadoop 配置传递 a.b.c ,而增量 lake 将使用它来访问 Hadoop 文件系统 api。

数据帧选项

除了通过 Spark (群集设置 Hadoop 文件系统配置外) 配置或 SQL 会话配置,Delta 还支持从和选项读取 hadoop 文件系统配置, DataFrameReaderDataFrameWriter (也就是说, fs. 使用或读取或写入表时,以前缀) 开头的选项键 DataFrameReader.load(path)DataFrameWriter.save(path)

注意

此功能在 Databricks Runtime 10.1 及更高版本中可用。

例如,可以通过数据帧选项传递存储凭据:

Python

df1 = spark.read.format("delta") \
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>") \
  .read("...")
df2 = spark.read.format("delta") \
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>") \
  .read("...")
df1.union(df2).write.format("delta") \
  .mode("overwrite") \
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>") \
  .save("...")

Scala

val df1 = spark.read.format("delta")
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>")
  .read("...")
val df2 = spark.read.format("delta")
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>")
  .read("...")
df1.union(df2).write.format("delta")
  .mode("overwrite")
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>")
  .save("...")

可以在 数据源中找到存储的 Hadoop 文件系统配置的详细信息。

笔记本

有关各种 Delta 表元数据命令的示例,请参阅以下笔记本的末尾:

Delta Lake 批处理命令笔记本

获取笔记本