增量实时表概念

本文介绍了有效使用增量实时表应了解的基本概念。

本节内容:

管道

增量实时表中的主要执行单元是管道。 管道是一种将数据源链接到目标数据集的有向无环图 (DAG)。 你可以使用返回 Spark SQL 或 Koalas 数据帧的 SQL 查询或 Python 函数来定义增量实时表数据集的内容。 管道还有一个关联的配置,用于定义运行管道所需的设置。 定义数据集时,可以选择指定数据质量约束。

在 Azure Databricks 笔记本中实现增量实时表管道。 你可以在单个笔记本或多个笔记本中实现管道。 单个笔记本中的所有查询要么都用 Python 实现,要么都用 SQL 实现。但是,你可以混合使用 Python 和 SQL 笔记本来配置多笔记本管道。 每个笔记本共享输出数据的存储位置,并且能够从管道中的其他笔记本引用数据集。

可以使用 Databricks 存储库来存储和管理增量实时表笔记本。 若要使某个利用 Databricks 存储库管理的笔记本在创建管道时可用,请执行以下操作:

  • 在 SQL 笔记本顶部添加注释行 -- Databricks notebook source
  • 在 Python 笔记本顶部添加注释行 # Databricks notebook source

若要详细了解如何创建和运行管道,请参阅创建、运行和管理增量实时表管道。 有关配置多笔记本管道的示例,请参阅在管道中配置多个笔记本

查询

查询通过定义数据源和目标数据集来实现数据转换。 增量实时表查询可以用 PythonSQL 实现。

预期

使用“预期”来指定对数据集内容的数据质量控制。 与传统数据库中的 CHECK 约束(用于阻止添加任何不符合约束的记录)不同,预期在处理不符合数据质量要求的数据时比较灵活。 这种灵活性允许你处理和存储预计会出现混乱的数据以及必须满足严格质量要求的数据。

你可以定义预期,以保留验证失败的记录删除验证失败的记录或在记录验证失败时停止管道

管道设置

管道设置以 JSON 格式定义,并包含运行管道所需的参数,其中包括:

  • 包含查询的库(以笔记本的形式提供),这些查询描述用于在 Delta Lake 中创建目标数据集的表和视图。
  • 云存储位置,用于存储处理时所需的表和元数据。 此位置为 DBFS 或你提供的其他位置。
  • 用于处理数据的 Spark 群集的可选配置。

有关更多详细信息,请参阅增量实时表设置

管道更新

创建管道并准备好运行后,就可以开始更新。 更新将:

  • 使用正确的配置启动群集。
  • 发现定义的所有表和视图,并检查是否存在任何分析错误,例如无效的列名、缺少的依赖项、语法错误等。
  • 使用最新的可用数据创建或更新所有表和视图。

如果管道被触发,系统会在一次性更新管道中的所有表后停止处理。

当触发更新成功完成时,可保证根据更新开始时可用的数据更新每个表。

对于要求低延迟的用例,可以将管道配置为连续更新。

有关如何为管道选择执行模式的详细信息,请参阅连续管道和触发管道

数据集

增量实时表管道中有两种类型的数据集:视图和表。

  • 视图类似于 SQL 中的临时视图,是某些计算资源的别名。 借助视图,不仅可以将复杂的查询分解为较小或更容易理解的查询, 还可以重用给定转换作为多个表的源。 视图只能在管道内使用,不能以交互方式查询。
  • 表类似于传统的具体化视图。 增量实时表运行时会自动创建 Delta 格式的表,并确保这些表使用创建该表的查询的最新结果进行更新。

可以定义实时或者流式处理实时视图或表:

实时表或视图始终反映定义它的查询的结果,包括定义该表或视图的查询何时更新,或输入数据源何时更新。 与传统的具体化视图一样,在可能的情况下,可以计算整个实时表或视图以优化计算资源和时间。

流式处理实时表或视图仅处理自上次管道更新以来添加的数据。 流式处理表和视图是有状态的;如果定义查询发生更改,将根据新查询处理新数据,并且不会重新计算现有数据。

流式处理实时表在许多用例中很有作用,包括:

  • 数据保留:流式处理实时表可以无限期保留数据,即使输入数据源的保留期较短,例如 Apache Kafka 或 Amazon Kinesis 等流式处理数据源。
  • 数据源演进:即使数据源发生更改(例如从 Kafka 迁移到 Kinesis),也可以保留数据。

你可以发布表,使其可供下游使用者发现和查询。

连续管道和触发管道

增量实时表支持两种不同的执行模式:

  • 触发管道使用当前可用的任何数据更新每个表,然后停止运行管道的群集。 增量实时表会自动分析表之间的依赖关系,并开始计算从外部源读取数据的表。 管道中的表在其依赖数据源更新后更新。
  • 连续管道随着输入数据的变化不断更新表。 更新开始后,它将继续运行,直到手动停止。 连续管道需要一个始终运行的群集,但可确保下游使用者拥有最新的数据。

触发管道可以减少资源消耗和费用,因为群集仅在执行管道时运行。 但是,在触发管道之前,不会处理新数据。 连续管道需要一个始终运行的群集,其成本较高,但可以减少处理延迟。

管道设置中的 continuous 标志可控制执行模式。 默认情况下,管道在触发执行模式下运行。 如果需要对管道中的表进行低延迟更新,请将 continuous 设置为 true

{
  ...
  "continuous": true,
  ...
}

执行模式与要计算的表的类型无关。 实时表和流式处理实时表都可以在任一执行模式下更新。

如果管道中的某些表具有较低的延迟要求,可以通过设置 pipelines.trigger.interval 设置来独立配置其更新频率:

spark_conf={"pipelines.trigger.interval": "1 hour"}

此选项不会在管道更新的间隔时间内关闭群集,但可以释放资源以更新管道中的其他表。

连续管道中的表和视图

可以在连续运行的管道中使用实时表或视图以及流式处理实时表或视图。 为了避免不必要的处理,管道会自动监视依赖增量表,并仅在这些依赖表的内容发生更改时才执行更新。

增量实时表运行时无法检测非增量数据源中的更改。 该表仍会定期更新,但具有较高的默认触发间隔,以防止过度重新计算减慢群集上发生的任何增量处理。

开发和生产模式

你可以通过切换使用开发模式和生产模式来优化管道执行。 当你在开发模式下运行管道时,增量实时表系统会:

  • 重用群集,以避免重新启动产生的开销。
  • 禁用管道重试,以便可以立即检测和修复错误。

在生产模式下,增量实时表系统会:

  • 为特定的可恢复错误(包括内存泄漏和过期凭据)重新启动群集。
  • 在发生特定错误(例如启动群集失败)时重试执行。

使用管道 UI 中的 Delta Live Tables Environment Toggle Icon 按钮在开发和生产模式之间切换。 默认情况下,管道在开发模式下运行。

在开发和生产模式间切换只控制群集和管道执行行为。 存储位置必须配置为管道设置的一部分,并且在模式间切换时不受影响。

Databricks 增强型自动缩放

重要

此功能目前以公共预览版提供。

Databricks 增强型自动缩放可以根据工作负载量自动分配群集资源,从而优化群集利用率,并尽量减轻对管道数据处理延迟的影响。

增强型自动缩放在现有群集自动缩放功能的基础上增加了以下功能:

  • 增强型自动缩放实现流式处理工作负载的优化,并添加了增强功能来提高批处理工作负载的性能。 这些优化提高了群集利用率、减少了资源使用率并降低了成本。
  • 增强型自动缩放主动关闭未充分利用的节点,同时保证在关闭期间不会有任务失败。 仅当节点处于空闲状态时,现有群集自动缩放功能才会纵向缩减节点。

要求

若要使用增强型自动缩放,请执行以下操作:

  1. 管道设置configuration 对象中将 pipelines.advancedAutoscaling.enabled 字段设置为 "true"
  2. autoscale 配置添加到管道 default 群集。 以下示例配置一个增强型自动缩放群集,其中最少包含 5 个工作器,最多包含 10 个工作器。 max_workers 必须大于或等于 min_workers

注意

  • 增强型自动缩放仅适用于 default 群集。 如果在维护群集配置中包含 autoscale 配置,则会使用现有的群集自动缩放功能。
  • 如果添加 autoscale 配置但不添加 pipelines.advancedAutoscaling.enabled 配置,增量实时表将使用现有的群集自动缩放功能。
{
  "configuration": {
    "pipelines.advancedAutoscaling.enabled": "true"
  },
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 5,
        "max_workers": 10
      }
    }
  ]
}

如果管道是连续的,则在自动缩放配置更改后,管道将自动重启。 重启后,预期有短暂的一段时间延迟会提高。 经过短暂的延迟提高之后,群集大小将根据 autoscale 配置更新,管道延迟将恢复为之前的延迟特性。

监视已启用增强型自动缩放的管道

可以使用增量实时表事件日志来监视增强型自动缩放指标。 可以在用户界面中查看指标。 增强型自动缩放事件的事件类型为 autoscale。 下面是示例事件:

事件 消息
已提交群集调整大小请求 Autoscale cluster to <X> executors while keeping alive <Y> executors and retiring <Z> executors
群集管理器已接受调整大小请求 Submitted request to resize cluster <cluster-id> to size <X>.
调整大小成功完成 Achieved desired cluster size <X> for cluster <cluster-id>.

还可以通过直接查询事件日志来查看增强型自动缩放事件:

产品版本

可以使用增量实时表产品版本选项来运行具有最符合管道要求的功能的管道。 可以使用以下产品版本:

  • core,用于运行流式处理引入工作负载。 如果你的管道不需要变更数据捕获 (CDC) 或增量实时表期望等高级功能,请选择 core 版本。
  • pro,用于运行流式处理引入和 CDC 工作负载。 pro 产品版本支持所有 core 功能,此外还支持需要根据源数据的更改更新表的工作负载。
  • advanced,用于运行流式处理引入工作负载、CDC 工作负载,以及需要“期望”功能的工作负载。 advanced 产品版本支持 corepro 版本的功能,此外还支持通过增量实时表期望强制实施数据质量约束。

创建编辑管道时可以选择产品版本。 可为每个管道选择不同的版本。

如果你的管道包含所选产品版本不支持的功能(例如期望),你将收到错误消息以及出错原因。 然后,你可以编辑该管道以选择适当的版本。