CREATE STREAMING TABLE
适用于: Databricks SQL Databricks Runtime 13.3 LTS 及更高版本
重要
此功能目前以公共预览版提供。
建立流式处理表,它是 Delta 表,额外支持流式处理或增量数据处理。
流式处理表仅在增量实时表和具有 Unity Catalog 的 Databricks SQL 中受支持。 在支持的 Databricks Runtime 计算上运行此命令仅分析语法。 请参阅使用 SQL 实现 Delta Live Tables 管道。
语法
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( [ column_identifier column_type [ NOT NULL ]
[ COMMENT column_comment ] [ column_constraint ]
] [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]
参数
REFRESH
如果指定,则使用查询中定义的源中的最新可用数据刷新表。 只有在查询开始之前到达的新数据才会被处理。 在执行命令期间添加到源的新数据将被忽略,直到下一次刷新为止。
IF NOT EXISTS
如果指定了该参数,并且已存在名称相同的表,则会忽略该语句。
IF NOT EXISTS
无法与REFRESH
一起使用,这表示不允许CREATE OR REFRESH TABLE IF NOT EXISTS
。-
要创建的表的名称。 名称不得包含时态规范。 如果未限定该名称,则会在当前架构中创建该表。
table_specification
此可选子句定义列的列表、列的类型、属性、说明和列约束。
如果未在表架构中定义列,则必须指定
AS query
。-
列的唯一名称。
-
指定列的数据类型。
NOT NULL
如果已指定,则列不会接受
NULL
值。COMMENT column_comment
用于描述列的字符串字面量。
-
重要
此功能目前以公共预览版提供。
将主键或外键约束添加到流式处理表中的列。
hive_metastore
目录中的表不支持约束。 CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
将数据质量预期添加到表中。 这些数据质量预期可以随着时间的推移进行跟踪,并通过流式处理表的事件日志进行访问。 在创建表和刷新表时,
FAIL UPDATE
期望会导致处理失败。 如果未满足DROP ROW
预期,则预期会导致删除整行。expectation_expr
可能包含文本、表中的列标识符以及内置的确定性 SQL 函数或运算符,但以下内容除外:此外,
expr
不能包含任何expr
。-
重要
此功能目前以公共预览版提供。
向流式处理表添加信息性主键或信息性外键约束。
hive_metastore
目录中的表不支持键约束。
-
-
table_clauses
为新表指定分区、注释、用户定义的属性和刷新计划(可选)。 每个子子句只能指定一次。
-
表中用于对表进行分区的列可选列表。
COMMENT table_comment
用于描述表的
STRING
文本。-
可以选择设置一个或多个用户定义的属性。
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
如果提供计划,则它会流式处理表或具体化视图,以使用给定的 quartz cron 计划刷新其数据。 仅接受 time_zone_values。 不支持
AT TIME ZONE LOCAL
。 如果AT TIME ZONE
不存在,则使用会话时区。 如果AT TIME ZONE
不存在并且未设置会话时区,则会引发错误。SCHEDULE
在语义上等效于SCHEDULE REFRESH
。不能在增量实时表管道定义中使用
SCHEDULE
语法。CREATE OR REFRESH
命令中不允许使用子句SCHEDULE
。 计划可以作为CREATE
命令的一部分提供。 使用 ALTER STREAMING TABLE 可以在创建后更改流式处理表的计划。
-
AS 查询
此子句使用
query
中的数据来填充表。 此查询必须是流式处理查询。 这可以通过将STREAM
关键字添加到要增量处理的任何关系中来实现。 同时指定query
和table_specification
时,table_specification
中指定的表架构必须包含query
返回的所有列,否则会出现错误。 在table_specification
中指定但未由query
返回的任何列在查询时都返回null
值。此子句对于在 Databricks SQL 中创建的流式处理表是必需的,但在增量实时表中不是必需的。 如果增量实时表中未提供此子句,则必须在 DLT 管道中的
APPLY CHANGES
命令中引用此表。 请参阅,在增量实时表中使用 SQL 进行变更数据捕获。
流式处理表与其他表之间的差异
流式处理表是有状态的表,设计用于在处理不断增长的数据集时只处理每一行一次。 由于大部分数据集会随着时间不断扩大,因此流式处理表非常适合用于大多数引入工作负载。 流式处理表最适合需要全新数据和低延迟的管道。 流式处理表还可用于大规模转换,因为随着新数据的到达可以增量计算结果,使结果保持最新,而无需在每次更新时完全重新计算所有源数据。 流式处理表专为仅追加的数据源而设计。
流式处理表接受其他命令,例如 REFRESH
,啟处理查询中提供的源中可用的最新数据。 对所提供查询的更改仅通过调用 REFRESH
反映在新数据上,而不是以前处理过的数据。 要将更改也应用于现有数据,需要执行 REFRESH TABLE <table_name> FULL
来执行 FULL REFRESH
。 完全刷新使用最新定义重新处理源中的所有可用数据。 不建议对不保留整个数据历史记录或保留期较短的源(如 Kafka)调用完全刷新,因为完全刷新会截断现有数据。 如果数据在源中不再可用,则可能无法恢复旧数据。
限制
只有表所有者才能刷新流式处理表以获取最新数据。
流式处理表上不允许
ALTER TABLE
命令。 表的定义和属性应该通过ALTER STREAMING TABLE
语句进行更改。不支持按时间顺序查看查询。
不支持通过 DML 命令(如
INSERT INTO
和MERGE
)来发展表模式。流式处理表不支持以下命令:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
不支持 Delta Sharing。
不支持重命名表或更改所有者。
不支持
PRIMARY KEY
和FOREIGN KEY
等表约束。不支持生成的列、标识列和默认列。
示例
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
相关文章
反馈
https://aka.ms/ContentUserFeedback。
即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅:提交和查看相关反馈