在 Databricks SQL 中使用流式处理表加载数据

重要

此功能目前以公共预览版提供。 要注册以进行访问,请填写此表格

Databricks 建议使用流式处理表通过 Databricks SQL 引入数据。 流式处理表是一种 Unity Catalog 管理的表,它额外支持流式处理或增量数据处理。 系统会自动为每个流式处理表创建一个 DLT 管道。 可以使用流式处理表从 Kafka 和云对象存储进行增量数据加载。

本文演示如何使用流式处理表从配置为 Unity Catalog 卷(建议)或外部位置的云对象存储加载数据。

注意

若要了解如何使用 Delta Lake 表作为流式处理源和接收器,请参阅 Delta 表流式处理读取和写入

开始之前的准备工作

在开始之前,请确保具有以下各项:

  • 启用了无服务器的 Azure Databricks 帐户。 有关详细信息,请参阅启用无服务器 SQL 仓库

  • 一个启用了 Unity Catalog 的工作区。 有关详细信息,请参阅设置和管理 Unity Catalog

  • 使用 Current 通道的 SQL 仓库。

  • 若要查询由增量实时表管道创建的流式传输表,必须使用采用 Databricks Runtime 13.3 LTS 及更高版本或 SQL 仓库的共享计算。 无法从已分配的群集或非隔离群集查询在已启用 Unity Catalog 的管道中创建的流式传输表。

  • 对 Unity Catalog 外部位置的 READ FILES 特权。 有关详细信息,请参阅创建外部位置以将云存储连接到 Azure Databricks

  • 对在其中创建流式处理表的目录的 USE CATALOG 特权。

  • 对在其中创建流式处理表的架构的 USE SCHEMA 特权。

  • 对在其中创建流式处理表的架构的 CREATE TABLE 特权。

  • 源数据的路径。

    卷路径示例:/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    外部位置路径示例:abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    注意

    本文假设要加载的数据位于云存储位置,该位置对应于你有权访问的 Unity Catalog 卷或外部位置。

发现和预览源数据

  1. 在工作区的边栏中,单击“查询”,然后单击“创建查询”。

  2. 在查询编辑器中,从下拉列表中选择使用该 Current 通道的 SQL 仓库。

  3. 将以下内容粘贴到编辑器中,将尖括号 (<>) 中的值替换为标识源数据的信息,然后单击“运行”。

    注意

    如果函数的默认值无法分析数据,则在运行 read_files 表值函数时可能会遇到架构推理错误。 例如,可能需要为多行 CSV 或 JSON 文件配置多行模式。 有关分析程序选项的列表,请参阅 read_files 表值函数

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

将数据加载到流式处理表中

若要用云对象存储中的数据创建流式处理表,请将以下内容粘贴到查询编辑器中,然后单击“运行”:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

使用 DLT 管道刷新流式处理表

本部分介绍使用查询中定义的源中提供的最新数据刷新流式处理表的模式。

流式处理表的 CREATE 操作使用 Databricks SQL 仓库来初始创建数据并将数据加载到流式处理表中。 流式处理表的 REFRESH 操作使用 Delta Live Tables (DLT)。 系统会自动为每个流式处理表创建一个 DLT 管道。 刷新流式处理表时,将启动对 DLT 管道的更新以处理刷新。

运行 REFRESH 命令后,将返回 DLT 管道链接。 可以使用 DLT 管道链接来检查刷新状态。

注意

只有表的所有者可以刷新流式处理表以获取最新数据。 创建表的用户就是所有者,所有者不能更改。

请参阅什么是增量实时表?

仅引入新数据

默认情况下,read_files 函数在表创建期间读取源目录中的所有现有数据,然后在每次刷新时处理新进入的记录。

若要避免在创建表时引入源目录中已存在的数据,请将 includeExistingFiles 选项设置为 false。 这意味着,将仅处理表创建后进入目录的数据。 例如:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

完全刷新流式处理表

完全刷新使用最新定义重新处理源中的所有可用数据。 不建议对不保留整个数据历史记录或保留期较短的源(如 Kafka)调用完全刷新,因为完全刷新会截断现有数据。 如果数据在源中不再可用,则可能无法恢复旧数据。

例如:

REFRESH STREAMING TABLE my_bronze_table FULL

为流式处理表计划自动刷新

若要将流式处理表配置为根据定义的计划自动刷新,请将以下内容粘贴到查询编辑器中,然后单击“运行”:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

有关刷新计划查询的示例,请参阅 ALTER STREAMING TABLE

跟踪刷新状态

可以在 Delta Live Tables UI 中查看管理流式处理表的管道或查看 DESCRIBE EXTENDED 命令为流式处理表返回的刷新信息,以此查看流式处理表的刷新状态。

DESCRIBE EXTENDED <table-name>

从 Kafka 流式引入

有关从 Kafka 流式引入的示例,请参阅 read_kafka

授予用户对流式处理表的访问权限

若要授予用户对流式处理表的 SELECT 特权,以便他们可以对其进行查询,请将以下内容粘贴到查询编辑器中,然后单击“运行”:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

有关授予 Unity Catalog 安全对象特权的详细信息,请参阅 Unity Catalog 特权和安全对象

限制

  • 美国中南部和美国西部 2 区域不支持 Databricks SQL 流式处理表。

其他资源