表批量读取和写入Table batch reads and writes

Delta Lake 支持 Apache Spark 数据帧读取和写入 API 提供的大部分选项,这些选项可用于对表执行批量读取和写入操作。Delta Lake supports most of the options provided by Apache Spark DataFrame read and write APIs for performing batch reads and writes on tables.

有关 Delta Lake SQL 命令的信息,请参阅For information on Delta Lake SQL commands, see

创建表 Create a table

Delta Lake 支持使用 DataFrameWriter (Scala、JavaPython) 直接基于路径创建表。Delta Lake supports creating tables directly based on the path using DataFrameWriter (Scala or Java and Python). Delta Lake 还支持使用标准 DDL CREATE TABLE 在元存储中创建表。Delta Lake also supports creating tables in the metastore using standard DDL CREATE TABLE. 使用 Delta Lake 在元存储中创建表时,它会将表数据的位置存储在元存储中。When you create a table in the metastore using Delta Lake, it stores the location of the table data in the metastore. 通过此指针,其他用户可以更轻松地发现和引用数据,无需担心数据的确切存储位置。This pointer makes it easier for other users to discover and refer to the data without having to worry about exactly where it is stored. 不过,元存储不是表中有效内容的事实来源。However, the metastore is not the source of truth about what is valid in the table. 那仍然是 Delta Lake 的职责。That responsibility stays with Delta Lake.

SQLSQL

-- Create table in the metastore
CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA

PythonPython

df.write.format("delta").saveAsTable("events")      # create table in the metastore

df.write.format("delta").save("/mnt/delta/events")  # create table by path

ScalaScala

df.write.format("delta").saveAsTable("events")      // create table in the metastore

df.write.format("delta").save("/mnt/delta/events")  // create table by path

在 Databricks Runtime 7.0 及更高版本中,可以使用接口创建增量表 DataFrameWriterV2In Databricks Runtime 7.0 and above you can create Delta tables using the DataFrameWriterV2 interface. SQL 还支持在路径创建表,而无需在 Hive 元存储中创建条目。SQL also supports a creating table at a path, without creating an entry in the Hive metastore.

SQLSQL

-- Create a table by path
CREATE OR REPLACE TABLE delta.`/mnt/delta/events` (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA
PARTITIONED BY (date);

-- Create a table in the metastore
CREATE OR REPLACE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA
PARTITIONED BY (date);

ScalaScala

df.writeTo("delta.`/mnt/delta/events`").using("delta").partitionedBy("date").createOrReplace() // create table by path

df.writeTo("events").using("delta").partitionedBy("date").createOrReplace()                   // create table in the metastore

将数据分区Partition data

你可以对数据进行分区,以加速其谓词涉及分区列的查询或 DML。You can partition data to speed up queries or DML that have predicates involving the partition columns. 若要在创建 Delta 表时对数据进行分区,请指定按列分区。To partition data when you create a Delta table, specify partition by columns. 常见的模式是按日期进行分区,例如:A common pattern is to partition by date, for example:

SQLSQL

-- Create table in the metastore
CREATE TABLE events (
 date DATE,
 eventId STRING,
 eventType STRING,
 data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION '/mnt/delta/events'

PythonPython

df.write.format("delta").partitionBy("date").saveAsTable("events")      # create table in the metastore

df.write.format("delta").partitionBy("date").save("/mnt/delta/events")  # create table by path

ScalaScala

df.write.format("delta").partitionBy("date").saveAsTable("events")      // create table in the metastore

df.write.format("delta").partitionBy("date").save("/mnt/delta/events")  // create table by path

控制数据位置Control data location

若要控制 Delta 表文件的位置,可以选择将 LOCATION 指定为 DBFS 上的路径。To control the location of the Delta table files, you can optionally specify the LOCATION as a path on DBFS.

使用指定的 LOCATION 创建的表被视为不受元存储管理。Tables created with a specified LOCATION are considered unmanaged by the metastore. 与不指定路径的托管表不同,非托管表的文件在你 DROP 表时不会被删除。Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when you DROP the table.

如果运行 CREATE TABLE 时指定的 LOCATION 已包含使用 Delta Lake 存储的数据,则 Delta Lake 会执行以下操作:When you run CREATE TABLE with a LOCATION that already contains data stored using Delta Lake, Delta Lake does the following:

  • 如果仅指定了表名称和位置,例如:If you specify only the table name and location, for example:

    CREATE TABLE events
    USING DELTA
    LOCATION '/mnt/delta/events'
    

    Hive 元存储中的表会自动继承现有数据的架构、分区和表属性。the table in the Hive metastore automatically inherits the schema, partitioning, and table properties of the existing data. 此功能可用于将数据“导入”到元存储中。This functionality can be used to “import” data into the metastore.

  • 如果你指定了任何配置(架构、分区或表属性),则 Delta Lake 会验证指定的内容是否与现有数据的配置完全匹配。If you specify any configuration (schema, partitioning, or table properties), Delta Lake verifies that the specification exactly matches the configuration of the existing data.

    重要

    如果指定的配置与数据的配置并非完全匹配,则 Delta Lake 会引发一个描述差异的异常。If the specified configuration does not exactly match the configuration of the data, Delta Lake throws an exception that describes the discrepancy.

读取表 Read a table

您可以通过指定表名称或路径将增量表作为数据帧加载:You can load a Delta table as a DataFrame by specifying a table name or a path:

SQLSQL

SELECT * FROM events   -- query table in the metastore

SELECT * FROM delta.`/mnt/delta/events`  -- query table by path

PythonPython

spark.table("events")    # query table in the metastore

spark.read.format("delta").load("/mnt/delta/events")  # query table by path

ScalaScala

spark.table("events")      // query table in the metastore

spark.read.format("delta").load("/mnt/delta/events")  // create table by path

返回的数据帧会自动读取表的最新快照来进行任何查询;你永远不需要运行 REFRESH TABLEThe DataFrame returned automatically reads the most recent snapshot of the table for any query; you never need to run REFRESH TABLE. 如果查询中存在适用的谓词,则 Delta Lake 会自动使用分区和统计信息来读取最少量的数据。Delta Lake automatically uses partitioning and statistics to read the minimum amount of data when there are applicable predicates in the query.

查询表的旧快照(按时间顺序查看) Query an older snapshot of a table (time travel)

本节内容:In this section:

Delta Lake 按时间顺序查看允许你查询 Delta 表的旧快照。Delta Lake time travel allows you to query an older snapshot of a Delta table. 按时间顺序查看有许多用例,包括:Time travel has many use cases, including:

  • 重新创建分析、报表或输出(例如,机器学习模型的输出)。Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). 这对于调试或审核非常有用,尤其是在管控行业中。This could be useful for debugging or auditing, especially in regulated industries.
  • 编写复杂的时态查询。Writing complex temporal queries.
  • 修复数据中的错误。Fixing mistakes in your data.
  • 为针对快速变化表的一组查询提供快照隔离。Providing snapshot isolation for a set of queries for fast changing tables.

本部分介绍了支持用于查询旧版表的方法、数据保留考虑事项,并提供了示例。This section describes the supported methods for querying older versions of tables, data retention concerns, and provides examples.

语法Syntax

本部分介绍了如何查询 Delta 表的较旧版本。This section shows how to query an older version of a Delta table.

本节内容:In this section:
SQL AS OF 语法SQL AS OF syntax
SELECT * FROM table_identifier TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_identifier VERSION AS OF version

wherewhere

  • table_identifier
    • [database_name.] table_name:表名称,可选择使用数据库名称进行限定。[database_name.] table_name: A table name, optionally qualified with a database name.
    • delta.<路径到表> :现有增量表的位置。 delta. : The location of an existing Delta table.
  • timestamp_expression 可以是下列项中的任意一项:timestamp_expression can be any one of:
    • '2018-10-18T22:15:12.013Z',即可以强制转换为时间戳的字符串'2018-10-18T22:15:12.013Z', that is, a string that can be cast to a timestamp
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18',即日期字符串'2018-10-18', that is, a date string
    • 在 Databricks Runtime 6.6 及更高版本中:In Databricks Runtime 6.6 and above:
      • current_timestamp() - interval 12 hours
      • date_sub(current_date(), 1)
      • 本身就是时间戳或可强制转换为时间戳的任何其他表达式Any other expression that is or can be cast to a timestamp
  • version 是可以从 DESCRIBE HISTORY table_spec 的输出中获取的 long 值。version is a long value that can be obtained from the output of DESCRIBE HISTORY table_spec.

timestamp_expressionversion 都不能是子查询。Neither timestamp_expression nor version can be subqueries.

SELECT * events TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * delta.`/mnt/delta/events` VERSION AS OF 123
DataFrameReader 选项DataFrameReader options

使用 DataFrameReader 选项,可以从固定到表的特定版本的 Delta 表创建数据帧。DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table.

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/mnt/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/mnt/delta/events")

对于 timestamp_string,只接受日期或时间戳字符串。For timestamp_string, only date or timestamp strings are accepted. 例如,"2019-01-01""2019-01-01T00:00:00.000Z"For example, "2019-01-01" and "2019-01-01T00:00:00.000Z".

常见的模式是在执行 Azure Databricks 作业的整个过程中使用 Delta 表的最新状态来更新下游应用程序。A common pattern is to use the latest state of the Delta table throughout the execution of an Azure Databricks job to update downstream applications.

由于 Delta 表会自动更新,因此,如果基础数据进行了更新,则在进行多次调用时,从 Delta 表加载的数据帧可能会返回不同的结果。Because Delta tables auto update, a DataFrame loaded from a Delta table may return different results across invocations if the underlying data is updated. 通过使用按时间顺序查看,你可以修复多次调用时数据帧返回的数据:By using time travel, you can fix the data returned by the DataFrame across invocations:

latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/mnt/delta/events`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/mnt/delta/events")
@ 语法@ syntax

你可以采用一个参数化管道,其中管道的输入路径是作业的参数。You may have a parametrized pipeline, where the input path of your pipeline is a parameter of your job. 在作业执行完以后,你可能需要在将来的某个时间重新生成输出。After the execution of your job, you may want to reproduce the output some time in the future. 在这种情况下,可以使用 @ 语法来指定时间戳或版本。In this case, you can use the @ syntax to specify the timestamp or version. 时间戳必须采用 yyyyMMddHHmmssSSS 格式。The timestamp must be in yyyyMMddHHmmssSSS format. 你可以通过在版本前附加一个 v@ 后指定版本。You can specify a version after @ by prepending a v to the version. 例如,若要查询表 events 的版本 123,请指定 events@v123For example, to query version 123 for the table events, specify events@v123.

SQLSQL
SELECT * FROM events@20190101000000000
SELECT * FROM events@v123
PythonPython
spark.read.format("delta").load("/mnt/delta/events@20190101000000000") # table on 2019-01-01 00:00:00.000
spark.read.format("delta").load("/mnt/delta/events@v123")              # table on version 123

数据保留Data retention

默认情况下,Delta 表将提交历史记录保留 30 天。By default, Delta tables retain the commit history for 30 days. 这意味着,你可以指定 30 天前的版本。This means that you can specify a version from 30 days ago. 但是,有以下注意事项:However, there are some caveats:

  • 你未对 Delta 表运行 VACUUMYou did not run VACUUM on your Delta table. 如果你运行了 VACUUM,则无法恢复到早于默认的 7 天数据保留期的版本。If you run VACUUM, you lose the ability to go back to a version older than the default 7 day data retention period.

可以使用以下表属性配置保留期:You can configure retention periods using the following table properties:

  • delta.logRetentionDuration = "interval <interval>":控制表的历史记录的保留时间长度。delta.logRetentionDuration = "interval <interval>": controls how long the history for a table is kept. 每次写入检查点时,Azure Databricks 会自动清除保留间隔之前的日志条目。Each time a checkpoint is written, Azure Databricks automatically cleans up log entries older than the retention interval. 如果将此配置设置为足够大的值,则会保留许多日志条目。If you set this config to a large enough value, many log entries are retained. 这应当不会影响性能,因为针对日志的操作时间恒定。This should not impact performance as operations against the log are constant time. 针对历史记录的操作是并行的(但会随着日志大小的增加而变得更为昂贵)。Operations on history are parallel (but will become more expensive as the log size increases). 默认值为 interval 30 daysThe default is interval 30 days.

  • delta.deletedFileRetentionDuration = "interval <interval>":对文件必须已删除多长时间才能成为 VACUUM 的候选对象进行控制。delta.deletedFileRetentionDuration = "interval <interval>": controls how long ago a file must have been deleted before being a candidate for VACUUM. 默认值为 interval 7 daysThe default is interval 7 days. 若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days"For access to 30 days of historical data, set delta.deletedFileRetentionDuration = "interval 30 days". 此设置可能会导致你的存储成本增加。This setting may cause your storage costs to go up.

    备注

    VACUUM 不清除日志文件;在写入检查点后,会自动清除日志文件。VACUUM doesn’t clean up log files; log files are automatically cleaned up after checkpoints are written.

若要按时间顺序查看以前的某个版本,必须同时保留该版本的日志文件和数据文件。To time travel to a previous version, you must retain both the log and the data files for that version.

示例Examples

  • 为用户 111 修复对表的意外删除问题:Fix accidental deletes to a table for the user 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • 修复对表的意外错误更新:Fix accidental incorrect updates to a table:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 查询在过去一周内增加的新客户的数量。Query the number of new customers added over the last week.

    SELECT count(distinct userId) - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

写入到表 Write to a table

追加 Append

使用 append 模式,可以将新数据以原子方式添加到现有 Delta 表中:Using append mode you can atomically add new data to an existing Delta table:

SQLSQL

INSERT INTO events SELECT * FROM newEvents

PythonPython

df.write.format("delta").mode("append").save("/mnt/delta/events")
df.write.format("delta").mode("append").saveAsTable("events")

ScalaScala

df.write.format("delta").mode("append").save("/mnt/delta/events")
df.write.format("delta").mode("append").saveAsTable("events")

OverwriteOverwrite

若要以原子方式替换表中的所有数据,可以使用 overwrite 模式:To atomically replace all of the data in a table, you can use overwrite mode:

SQLSQL

INSERT OVERWRITE TABLE events SELECT * FROM newEvents

PythonPython

df.write.format("delta").mode("overwrite").save("/mnt/delta/events")
df.write.format("delta").mode("overwrite").saveAsTable("events")

ScalaScala

df.write.format("delta").mode("overwrite").save("/mnt/delta/events")
df.write.format("delta").mode("overwrite").saveAsTable("events")

使用数据帧,你还可以有选择性地只覆盖与分区列上的谓词匹配的数据。Using DataFrames, you can also selectively overwrite only the data that matches predicates over partition columns. 以下命令以原子方式将一月份替换为 df 中的数据:The following command atomically replaces the month of January with the data in df:

PythonPython

df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") \
  .save("/mnt/delta/events")

ScalaScala

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/mnt/delta/events")

此示例代码将 df 中的数据写出,验证它是否位于指定的分区中,并执行原子替换。This sample code writes out the data in df, validates that it all falls within the specified partitions, and performs an atomic replacement.

备注

与 Apache Spark 中的文件 API 不同,Delta Lake 会记住并强制实施表的架构。Unlike the file APIs in Apache Spark, Delta Lake remembers and enforces the schema of a table. 这意味着,默认情况下,覆盖不会替换现有表的架构。This means that by default overwrites do not replace the schema of an existing table.

有关 Delta Lake 在更新表方面的支持,请参阅表删除、更新和合并For Delta Lake support for updating tables, see Table deletes, updates, and merges.

设置用户定义的提交元数据 Set user-defined commit metadata

可以使用 DataFrameWriter 选项 userMetadata 或 SparkSession 配置 spark.databricks.delta.commitInfo.userMetadata,将用户定义的字符串指定为这些操作所进行的提交中的元数据。You can specify user-defined strings as metadata in commits made by these operations, either using the DataFrameWriter option userMetadata or the SparkSession configuration spark.databricks.delta.commitInfo.userMetadata. 如果同时指定了两个参数,则此选项将优先。If both of them have been specified, then the option takes preference. 此用户定义的元数据在历史记录操作中是可读的。This user-defined metadata is readable in the history operation.

SQLSQL


SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE events SELECT * FROM newEvents

PythonPython

df.write.format("delta") \
  .mode("overwrite") \
  .option("userMetadata", "overwritten-for-fixing-incorrect-data") \
  .save("/mnt/delta/events")

ScalaScala

df.write.format("delta")
  .mode("overwrite")
  .option("userMetadata", "overwritten-for-fixing-incorrect-data")
  .save("/mnt/delta/events")

架构验证Schema validation

Delta Lake 会自动验证正在写入的数据帧的架构是否与表的架构兼容。Delta Lake automatically validates that the schema of the DataFrame being written is compatible with the schema of the table. Delta Lake 使用以下规则来确定从数据帧到表的写入是否兼容:Delta Lake uses the following rules to determine whether a write from a DataFrame to a table is compatible:

  • 所有数据帧列都必须存在于目标表中。All DataFrame columns must exist in the target table. 如果数据帧中有表中不存在的列,则会引发异常。If there are columns in the DataFrame not present in the table, an exception is raised. 表中存在但数据帧中不存在的列将设置为 NULL。Columns present in the table but not in the DataFrame are set to null.
  • 数据帧列数据类型必须与目标表中的列数据类型匹配。DataFrame column data types must match the column data types in the target table. 如果它们不匹配,则会引发异常。If they don’t match, an exception is raised.
  • 数据帧列名称不能仅通过大小写来区分。DataFrame column names cannot differ only by case. 这意味着不能在同一个表中定义诸如“Foo”和“foo”之类的列。This means that you cannot have columns such as “Foo” and “foo” defined in the same table. 尽管可以在区分大小写或不区分大小写(默认)模式下使用 Spark,但 Parquet 在存储和返回列信息时区分大小写。While you can use Spark in case sensitive or insensitive (default) mode, Parquet is case sensitive when storing and returning column information. 在存储架构时,Delta Lake 保留但不区分大小写,并采用此限制来避免潜在的错误、数据损坏或丢失问题。Delta Lake is case-preserving but insensitive when storing the schema and has this restriction to avoid potential mistakes, data corruption, or loss issues.

Delta Lake 支持使用 DDL 显式添加新列并自动更新架构。Delta Lake support DDL to explicitly add new columns explicitly and the ability to update schema automatically.

如果你指定其他选项(例如 partitionBy)与追加模式结合使用,则 Delta Lake 会验证它们是否匹配,在不匹配时会引发错误。If you specify other options, such as partitionBy, in combination with append mode, Delta Lake validates that they match and throws an error for any mismatch. 未提供 partitionBy 时,会在对现有数据分区之后自动进行追加。When partitionBy is not present, appends automatically follow the partitioning of the existing data.

备注

在 Databricks Runtime 7.0 及更高版本中,INSERT 语法提供了架构强制实施,并支持架构演变。In Databricks Runtime 7.0 and above, INSERT syntax provides schema enforcement and supports schema evolution. 如果列的数据类型不能安全地强制转换为 Delta Lake 表的数据类型,则会引发运行时异常。If a column’s data type cannot be safely cast to your Delta Lake table’s data type, then a runtime exception is thrown. 如果启用了架构演变,则新列可以作为架构的最后一列(或嵌套列)存在,以便架构得以演变。If schema evolution is enabled, new columns can exist as the last columns of your schema (or nested columns) for the schema to evolve.

更新表架构 Update table schema

Delta Lake 允许你更新表的架构。Delta Lake lets you update the schema of a table. 支持下列类型的更改:The following types of changes are supported:

  • 添加新列(在任意位置)Adding new columns (at arbitrary positions)
  • 重新排列现有列Reordering existing columns

你可以使用 DDL 显式地或使用 DML 隐式地进行这些更改。You can make these changes explicitly using DDL or implicitly using DML.

重要

更新 Delta 表架构时,从该表进行读取的流会终止。When you update a Delta table schema, streams that read from that table terminate. 如果你希望流继续进行,必须重启它。If you want the stream to continue you must restart it.

有关建议的方法,请参阅生产中的结构化流式处理For recommended methods, see Structured Streaming in production.

显式更新架构 Explicitly update schema

你可以使用以下 DDL 显式更改表的架构。You can use the following DDL to explicitly change the schema of a table.

添加列Add columns

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

默认情况下,为 Null 性为 trueBy default, nullability is true.

若要将列添加到嵌套字段,请使用:To add a column to a nested field, use:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
示例Example

如果运行 ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) 之前的架构为:If the schema before running ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) is:

- root
| - colA
| - colB
| +-field1
| +-field2

则运行之后的架构为:the schema after is:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

备注

仅支持为结构添加嵌套列。Adding nested columns is supported only for structs. 不支持数组和映射。Arrays and maps are not supported.

更改列注释或排序Change column comment or ordering

ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

若要更改嵌套字段中的列,请使用:To change a column in a nested field, use:

ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
示例Example

如果运行 ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST 之前的架构为:If the schema before running ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST is:

- root
| - colA
| - colB
| +-field1
| +-field2

则运行之后的架构为:the schema after is:

- root
| - colA
| - colB
| +-field2
| +-field1

替换列Replace columns

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
示例Example

运行以下 DSL 时:When running the following DSL:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

如果运行之前的架构为:if the schema before is:

- root
| - colA
| - colB
| +-field1
| +-field2

则运行之后的架构为:the schema after is:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

更改列类型或名称 Change column type or name

更改列的类型或名称或者删除列需要重写该表。Changing a column’s type or name or dropping a column requires rewriting the table. 为此,请使用 overwriteSchema 选项:To do this, use the overwriteSchema option:

更改列类型Change a column type
spark.read.table(...)
  .withColumn("date", col("date").cast("date"))
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
更改列名称Change a column name
spark.read.table(...)
  .withColumnRenamed("date", "date_created")
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)

自动架构更新Automatic schema update

Delta Lake 可以在 DML 事务(追加或覆盖)中自动更新表的架构,并使架构与要写入的数据兼容。Delta Lake can automatically update the schema of a table as part of a DML transaction (either appending or overwriting), and make the schema compatible with the data being written.

添加列Add columns

在以下情况下,会自动将数据帧中存在但表中缺少的列添加为写入事务的一部分:Columns that are present in the DataFrame but missing from the table are automatically added as part of a write transaction when:

  • writewriteStream 具有 .option("mergeSchema", "true")write or writeStream have .option("mergeSchema", "true")
  • spark.databricks.delta.schema.autoMerge.enabledtruespark.databricks.delta.schema.autoMerge.enabled is true

如果同时指定了这两个选项,则优先使用 DataFrameWriter 中的选项。When both options are specified, the option from the DataFrameWriter takes precedence. 添加的列将追加到它们所在结构的末尾。The added columns are appended to the end of the struct they are present in. 追加新列时,会保留大小写。Case is preserved when appending a new column.

备注

  • 当启用了表访问控制时,mergeSchema 不受支持(因为它会将需要 MODIFY 的请求提升为需要 ALL PRIVILEGES 的请求)。mergeSchema is not supported when table access control is enabled (as it elevates a request that requires MODIFY to one that requires ALL PRIVILEGES).
  • mergeSchema 不能与 INSERT INTO.write.insertInto() 一起使用。mergeSchema cannot be used with INSERT INTO or .write.insertInto().

NullTypeNullType columns

由于 Parquet 不支持 NullType,因此在写入到 Delta 表时会从数据帧中删除 NullType 列,但仍会将其存储在架构中。Because Parquet doesn’t support NullType, NullType columns are dropped from the DataFrame when writing into Delta tables, but are still stored in the schema. 如果为该列接收到不同的数据类型,则 Delta Lake 会将该架构合并到新的数据类型。When a different data type is received for that column, Delta Lake merges the schema to the new data type. 如果 Delta Lake 接收到现有列的 NullType,则在写入过程中会保留旧架构并删除新列。If Delta Lake receives a NullType for an existing column, the old schema is retained and the new column is dropped during the write.

不支持流式处理中的 NullTypeNullType in streaming is not supported. 由于在使用流式处理时必须设置架构,因此这应该非常罕见。Since you must set schemas when using streaming this should be very rare. 对于复杂类型(例如 ArrayTypeMapType),也不会接受 NullTypeNullType is also not accepted for complex types such as ArrayType and MapType.

替换表架构Replace table schema

默认情况下,覆盖表中的数据不会覆盖架构。By default, overwriting the data in a table does not overwrite the schema. 在不使用 replaceWhere 的情况下使用 mode("overwrite") 来覆盖表时,你可能还希望覆盖写入的数据的架构。When overwriting a table using mode("overwrite") without replaceWhere, you may still want to overwrite the schema of the data being written. 你可以通过将 overwriteSchema 选项设置为 true 来替换表的架构和分区:You replace the schema and partitioning of the table by setting the overwriteSchema option to true:

df.write.option("overwriteSchema", "true")

表中的视图Views on tables

Delta Lake 支持基于 Delta 表创建视图,就像使用数据源表一样。Delta Lake supports the creation of views on top of Delta tables just like you might with a data source table.

这些视图集成了表访问控制,可以实现列级和行级安全性。These views integrate with table access control to allow for column and row level security.

处理视图时的主要难题是解析架构。The core challenge when you operate with views is resolving the schemas. 如果你更改 Delta 表架构,则必须重新创建派生视图,以容纳向该架构添加的任何内容。If you alter a Delta table schema, you must recreate derivative views to account for any additions to the schema. 例如,如果向 Delta 表中添加一个新列,则必须确保此列在基于该基表构建的相应视图中可用。For instance, if you add a new column to a Delta table, you must make sure that this column is available in the appropriate views built on top of that base table.

表属性Table properties

你可以使用 CREATEALTER 中的 TBLPROPERTIES 将自己的元数据存储为表属性。You can store your own metadata as a table property using TBLPROPERTIES in CREATE and ALTER.

TBLPROPERTIES 存储为 Delta 表元数据的一部分。TBLPROPERTIES are stored as part of Delta table metadata. 如果在给定位置已存在 Delta 表,则无法在 CREATE 语句中定义新的 TBLPROPERTIESYou cannot define new TBLPROPERTIES in a CREATE statement if a Delta table already exists in a given location. 有关更多详细信息,请参阅创建表See table creation for more details.

此外,为了调整行为和性能,Delta Lake 支持某些 Delta 表属性:In addition, to tailor behavior and performance, Delta Lake supports certain Delta table properties:

  • 阻止 Delta 表中的删除和更新:delta.appendOnly=trueBlock deletes and updates in a Delta table: delta.appendOnly=true.

  • 配置按时间顺序查看保留属性:delta.logRetentionDuration=<interval-string>delta.deletedFileRetentionDuration=<interval-string>Configure the time travel retention properties: delta.logRetentionDuration=<interval-string> and delta.deletedFileRetentionDuration=<interval-string>. 有关详细信息,请参阅数据保留For details, see Data retention.

  • 配置要为其收集统计信息的列的数目:delta.dataSkippingNumIndexedCols=<number-of-columns>Configure the number of columns for which statistics are collected: delta.dataSkippingNumIndexedCols=<number-of-columns>. 此属性仅对写出的新数据有效。This property takes affect only for new data that is written out.

备注

  • 这些是仅有的受支持的带 delta. 前缀的表属性。These are the only supported delta.-prefixed table properties.
  • 修改 Delta 表属性是一个写入操作,该操作会与其他并发写入操作冲突,导致这些操作失败。Modifying a Delta table property is a write operation that will conflict with other concurrent write operations, causing them to fail. 建议仅当不存在对表的并发写入操作时才修改表属性。We recommend that you modify a table property only when there are no concurrent write operations on the table.

你还可以在第一次提交到 Delta 表期间使用 Spark 配置来设置带 delta. 前缀的属性。You can also set delta.-prefixed properties during the first commit to a Delta table using Spark configurations. 例如,若要使用属性 delta.appendOnly=true 初始化 Delta 表,请将 Spark 配置 spark.databricks.delta.properties.defaults.appendOnly 设置为 trueFor example, to initialize a Delta table with the property delta.appendOnly=true, set the Spark configuration spark.databricks.delta.properties.defaults.appendOnly to true. 例如: 。For example:

SQLSQL

spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")

ScalaScala

spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

PythonPython

spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

表元数据Table metadata

Delta Lake 提供了丰富的用来浏览表元数据的功能。Delta Lake has rich features for exploring table metadata.

它支持 SHOW [PARTITIONS | COLUMNS]DESCRIBE TABLEIt supports SHOW [PARTITIONS | COLUMNS] and DESCRIBE TABLE. 查看See

它还提供了以下独特的命令:It also provides the following unique commands:

DESCRIBE DETAIL

提供架构、分区、表大小等方面的信息。Provides information about schema, partitioning, table size, and so on. 有关详细信息,请参阅 检索增量表详细信息For details, see Retrieve Delta table details.

DESCRIBE HISTORY

提供出处信息,包括操作、用户等,以及向表的每次写入的操作指标。Provides provenance information, including the operation, user, and so on, and operation metrics for each write to a table. 表历史记录会保留 30 天。Table history is retained for 30 days. 有关详细信息,请参阅 检索增量表历史记录For details, see Retrieve Delta table history.

数据边栏提供此详细的表信息的可视化视图和 Delta 表的历史记录。The Data sidebar provides a visual view of this detailed table information and history for Delta tables. 除了表架构和示例数据之外,你还可以单击“历史记录”选项卡以查看随 DESCRIBE HISTORY 一起显示的表历史记录。In addition to the table schema and sample data, you can click the History tab to see the table history that displays with DESCRIBE HISTORY.

笔记本Notebook

有关各种 Delta 表元数据命令的示例,请参阅以下笔记本的末尾:For an example of the various Delta table metadata commands, see the end of the following notebook:

Delta Lake 批处理命令笔记本Delta Lake batch commands notebook

获取笔记本Get notebook