流式数据处理

许多应用程序要求根据不断到达的数据更新表。 但是,随着数据大小的增长,每次更新时重新处理数据所需的资源量可能会大得惊人。 可以定义流式处理表或视图,以增量计算不断到达的数据。 流式处理表和视图可减少引入新数据的成本和新数据可用前的延迟。

当为管道触发更新时,流式处理表或视图仅处理自上次更新以来到达的新数据。 增量实时表运行时会自动跟踪已处理的数据。

从外部数据源流式引入

若要引入流数据,必须从流式源定义流式处理实时表;例如,可以使用以下代码将外部数据作为流进行读取:

Python

inputPath = "/databricks-datasets/structured-streaming/events/"

@dlt.table
def streaming_bronze_table():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(inputPath)
  )

SQL

CREATE OR REFRESH STREAMING LIVE TABLE streaming_bronze_table
AS SELECT * FROM cloud_files("/databricks-datasets/structured-streaming/events/", "json")

从管道中的其他数据集流式传输

还可以从同一管道中的其他表流式传输数据。

Python

@dlt.table
def streaming_silver_table:
  return dlt.read_stream("streaming_bronze_table").where(...)

SQL

CREATE OR REFRESH STREAMING LIVE TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.streaming_bronze_table)
WHERE ...

处理单个管道中的流式数据和批数据

由于流式处理实时表使用什么是 Apache Spark 结构化流式处理?,因此流式处理实时表只能处理追加查询;即在源表中插入新行的查询。 不支持处理源表中的更新,例如合并和删除。 若要处理更新,请参阅 APPLY CHANGES INTO 命令。

常见的流式处理模式包括引入源数据以在管道中创建初始数据集。 这些初始数据集通常称为 bronze 表,通常用于执行简单的转换。 通过这些简单的转换重新处理 JSON 等低效格式可能会令人望而却步,这类格式非常适合存储在流式处理实时表中。

相比之下,管道中的最终表(通常称为 gold 表)通常需要复杂的聚合,或从作为 APPLY CHANGES INTO 操作目标的源中读取。 由于这些操作本质上是创建更新而不是追加,因此不支持将它们作为流式处理实时表的输入。 这些转换更适合具体化为一个实时表。

通过将流式处理实时表和实时表混合到单个管道中,可以简化管道并避免对原始数据进行代价高昂的重新引入或重新处理,还能充分利用 SQL 来计算经过有效编码和筛选的数据集上的复杂聚合。 以下示例演示了这种类型的混合处理:

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING LIVE TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING LIVE TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

详细了解如何使用自动加载程序从 Azure 存储高效读取 JSON 文件,以进行增量处理。

流式处理联接

增量实时表支持用于更新表的各种联接策略。

流批量联接

在使用主要静态维度表对连续仅附加数据流进行非规范化时,流批量联接是一个不错的选择。 每次更新派生数据集时,流中的新记录都会在更新开始时与批处理表的静态快照联接。 在执行完全刷新之前,静态表中添加或更新的记录不会反映在表中。

下面是流批量联接的示例:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING LIVE TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

在连续管道中,系统会定期轮询联接的批处理端,以获取每个微批处理中的更新。

流式处理聚合

简单的分布聚合(如计数、最小值、最大值或总和)和代数聚合(如平均值或标准差)也可以使用流式处理实时表进行增量计算。 Databricks 建议对组数量有限的查询进行增量聚合,例如,包含 GROUP BY country 子句的查询。 每次更新时仅读取新的输入数据。