增量实时表 API 指南

增量实时表 API 让你能够创建、编辑、删除、启动和查看管道相关的详细信息。

重要

要访问 Databricks REST API,必须进行身份验证

创建管道

端点 HTTP 方法
2.0/pipelines POST

创建新的增量实时表管道。

示例

此示例可创建新的触发管道。

请求

curl --netrc --request POST \
https://<databricks-instance>/api/2.0/pipelines \
--data @pipeline-settings.json

pipeline-settings.json:

{
  "name": "Wikipedia pipeline (SQL)",
  "storage": "/Users/username/data",
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5
      }
    }
  ],
  "libraries": [
    {
      "notebook": {
        "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
      }
    }
  ],
  "continuous": false
}

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

响应

{
  "pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5"
}

请求结构

请参阅 PipelineSettings

响应结构

字段名称 类型 说明
pipeline_id STRING 新创建的管道的唯一标识符。

编辑管道

端点 HTTP 方法
2.0/pipelines/{pipeline_id} PUT

更新现有管道的设置。

示例

此示例将 target 参数添加到 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道:

请求

curl --netrc --request PUT \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 \
> --data @pipeline-settings.json

pipeline-settings.json

{
  "id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
  "name": "Wikipedia pipeline (SQL)",
  "storage": "/Users/username/data",
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5
      }
    }
  ],
  "libraries": [
    {
      "notebook": {
        "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
      }
    }
  ],
  "target": "wikipedia_quickstart_data",
  "continuous": false
}

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

请求结构

请参阅 PipelineSettings

删除管道

端点 HTTP 方法
2.0/pipelines/{pipeline_id} DELETE

从增量实时表系统中删除管道。

示例

此示例删除 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道:

请求

curl --netrc --request DELETE \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

启动管道更新

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/updates POST

启动管道的更新。

示例

此示例使用完全刷新开始更新 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道:

请求

curl --netrc --request POST \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/updates \
--data '{ "full_refresh": "true" }'

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

响应

{
  "update_id": "a1b23c4d-5e6f-78gh-91i2-3j4k5lm67no8"
}

请求结构

字段名称 类型 说明
full_refresh BOOLEAN 是否重新处理所有数据。 如果为 true,则增量实时表系统将在运行管道之前重置所有表。

此字段是可选的。

默认值为 false

响应结构

字段名称 类型 说明
update_id STRING 新创建的更新的唯一标识符。

停止任何活动管道更新

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/stop POST

停止任何活动管道更新。 如果没有正在运行的更新,则此请求为无操作。

示例

此示例停止更新 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道:

请求

curl --netrc --request POST \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/stop

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

列出管道事件

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/events GET

检索管道的事件。

示例

此示例最多检索 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道的 5 个事件。

请求

curl -n -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/events \
--data '{"max_results": 5}'

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

请求结构

字段名称 类型 说明
page_token STRING 先前调用返回的页面令牌。 此字段与此请求中除 max_results 以外的所有字段互斥。 如果在设置此字段时设置了除 max_results 以外的任何字段,则会返回错误。

此字段是可选的。
max_results INT32 要在单个页面中返回的最大条目数。 即使有更多的可用事件,系统在响应中返回的事件也可能少于 max_results

此字段是可选的。

默认值为 25。

最大值为 100。 如果
max_results 的值大于 100,则返回错误。
order_by STRING 一个指示结果的排序顺序的字符串(例如,["timestamp asc"])。

排序顺序可以是升序,也可以是降序。 默认情况下,事件按时间戳的降序返回。

此字段是可选的。
filter STRING 用于选择结果子集的条件,使用类似于 SQL 的语法表示。 支持的筛选器包括:

* level='INFO'(或 WARNERROR
* level in ('INFO', 'WARN')
* id='[event-id]'
* timestamp > 'TIMESTAMP'(或 >=<<==

支持复合表达式,例如:
level in ('ERROR', 'WARN') AND timestamp> '2021-07-22T06:37:33.083Z'

此字段是可选的。

响应结构

字段名称 类型 说明
活动 管道事件的数组。 匹配请求条件的事件列表。
next_page_token STRING 如果存在,则为用于获取下一页事件的令牌。
prev_page_token STRING 如果存在,则为用于获取上一页事件的令牌。

获取管道详细信息

端点 HTTP 方法
2.0/pipelines/{pipeline_id} GET

获取有关管道的详细信息,包括管道设置和最新更新。

示例

此示例将获取 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道的详细信息:

请求

curl -n -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

响应

{
  "pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
  "spec": {
    "id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
    "name": "Wikipedia pipeline (SQL)",
    "storage": "/Users/username/data",
    "clusters": [
      {
        "label": "default",
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5
        }
      }
    ],
    "libraries": [
      {
        "notebook": {
          "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
        }
      }
    ],
    "target": "wikipedia_quickstart_data",
    "continuous": false
  },
  "state": "IDLE",
  "cluster_id": "1234-567891-abcde123",
  "name": "Wikipedia pipeline (SQL)",
  "creator_user_name": "username",
  "latest_updates": [
    {
      "update_id": "8a0b6d02-fbd0-11eb-9a03-0242ac130003",
      "state": "COMPLETED",
      "creation_time": "2021-08-13T00:37:30.279Z"
    },
    {
      "update_id": "a72c08ba-fbd0-11eb-9a03-0242ac130003",
      "state": "CANCELED",
      "creation_time": "2021-08-13T00:35:51.902Z"
    },
    {
      "update_id": "ac37d924-fbd0-11eb-9a03-0242ac130003",
      "state": "FAILED",
      "creation_time": "2021-08-13T00:33:38.565Z"
    }
  ],
  "run_as_user_name": "username"
}

响应结构

字段名称 类型 说明
pipeline_id STRING 管道的唯一标识符。
spec PipelineSettings 管道设置。
state STRING 管道的状态。 IDLERUNNING 中的一项。

如果 state = RUNNING,则至少有一个活动的更新。
cluster_id STRING 运行管道的群集的标识符。
name STRING 此管道的用户友好名称。
creator_user_name STRING 管道创建者的用户名。
latest_updates UpdateStateInfo 的数组 管道最近更新的状态,按从新到旧的更新顺序排序。
run_as_user_name STRING 管道运行时使用的用户名。

获取更新详细信息

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/updates/{update_id} GET

获取管道更新的详细信息。

示例

此示例将获取 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道更新 9a84f906-fc51-11eb-9a03-0242ac130003 的详细信息:

请求

curl -n -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/updates/9a84f906-fc51-11eb-9a03-0242ac130003

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

响应

{
  "update": {
    "pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
    "update_id": "9a84f906-fc51-11eb-9a03-0242ac130003",
    "config": {
      "id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
      "name": "Wikipedia pipeline (SQL)",
      "storage": "/Users/username/data",
      "configuration": {
        "pipelines.numStreamRetryAttempts": "5"
      },
      "clusters": [
        {
          "label": "default",
          "autoscale": {
            "min_workers": 1,
            "max_workers": 5
          }
        }
      ],
      "libraries": [
        {
          "notebook": {
            "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
          }
        }
      ],
      "target": "wikipedia_quickstart_data",
      "filters": {},
      "email_notifications": {},
      "continuous": false,
      "development": false
    },
    "cause": "API_CALL",
    "state": "COMPLETED",
    "creation_time": 1628815050279,
    "full_refresh": true
  }
}

响应结构

字段名称 类型 说明
pipeline_id STRING 管道的唯一标识符。
update_id STRING 此更新的唯一标识符。
config PipelineSettings 管道设置。
cause STRING 更新的触发器。 API_CALL
RETRY_ON_FAILURE, SERVICE_UPGRADE.
state STRING 更新的状态。 QUEUEDCREATED
WAITING_FOR_RESOURCES, INITIALIZING, RESETTING,
SETTING_UP_TABLES, RUNNING, STOPPING, COMPLETED,
FAILEDCANCELED 中的一项。
cluster_id STRING 运行管道的群集的标识符。
creation_time INT64 创建更新时的时间戳。
full_refresh BOOLEAN 是否触发了更新以执行完全刷新。 如果是,则在运行更新之前重置所有管道表。

列出管道

端点 HTTP 方法
2.0/pipelines/ GET

列出增量实时表系统中定义的管道。

示例

此示例从指定的 page_token 开始,最多检索两个管道的详细信息:

请求

curl -n -X GET https://<databricks-instance>/api/2.0/pipelines \
--data '{ "page_token": "eyJ...==",  "max_results": 2 }'

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.azuredatabricks.net)。

此示例使用 .netrc 文件。

响应

{
  "statuses": [
    {
      "pipeline_id": "e0f01758-fc61-11eb-9a03-0242ac130003",
      "state": "IDLE",
      "name": "dlt-pipeline-python",
      "latest_updates": [
        {
          "update_id": "ee9ae73e-fc61-11eb-9a03-0242ac130003",
          "state": "COMPLETED",
          "creation_time": "2021-08-13T00:34:21.871Z"
        }
      ],
      "creator_user_name": "username"
    },
    {
      "pipeline_id": "f4c82f5e-fc61-11eb-9a03-0242ac130003",
      "state": "IDLE",
      "name": "dlt-pipeline-python",
      "creator_user_name": "username"
    }
  ],
  "next_page_token": "eyJ...==",
  "prev_page_token": "eyJ..x9"
}

请求结构

字段名称 类型 说明
page_token STRING 先前调用返回的页面令牌。

此字段是可选的。
max_results INT32 要在单个页面中返回的最大条目数。 即使有更多的可用事件,系统在响应中返回的事件也可能少于 max_results

此字段是可选的。

默认值为 25。

最大值为 100。 如果
max_results 的值大于 100,则返回错误。
order_by 一个由 STRING 构成的数组 指定结果顺序的字符串列表,例如,
["name asc"]. 支持的 order_by 字段为 id
name. 默认值为 id asc

此字段是可选的。
filter STRING 根据指定条件选择结果的子集。

支持的筛选器包括:

"notebook='<path>'",用以选择引用所提供笔记本路径的管道。

name LIKE '[pattern]',用以选择名称与 pattern 匹配的管道。 支持通配符,例如:
name LIKE '%shopping%'

不支持复合筛选器。

此字段是可选的。

响应结构

字段名称 类型 描述
statuses PipelineStateInfo 的数组 匹配请求条件的事件列表。
next_page_token STRING 如果存在,则为用于获取下一页事件的令牌。
prev_page_token STRING 如果存在,则为用于获取上一页事件的令牌。

数据结构

本节内容:

KeyValue

指定配置参数的键值对。

字段名称 类型 描述
key STRING 配置属性名称。
value STRING 配置属性值。

NotebookLibrary

包含管道代码的笔记本的规范。

字段名称 类型 描述
path STRING 笔记本的绝对路径。

此字段为必需字段。

PipelineLibrary

管道依赖项的规范。

字段名称 类型 说明
笔记本 NotebookLibrary 定义增量实时表数据集的笔记本的路径。 路径必须在 Databricks 工作区中,例如:
{ "notebook" : { "path" : "/my-pipeline-notebook-path" } }.

PipelineSettings

管道部署的设置。

字段名称 类型 说明
id STRING 此管道的唯一标识符。

标识符是由增量实时表系统创建的,并且在创建管道时不得提供。
name STRING 此管道的用户友好名称。

此字段是可选的。

默认情况下,管道名称必须唯一。 若要使用重复的名称,请在管道配置中将 allow_duplicate_names 设置为 true
存储 STRING 用于存储管道创建的检查点和表的 DBFS 目录的路径。

此字段是可选的。

如果此字段为空,则系统将使用默认位置。
配置 STRING:STRING 的映射 要添加到将运行管道的群集的 Spark 配置中的键值对的列表。

此字段是可选的。

必须将元素设置为“键:值”对的格式。
clusters PipelinesNewCluster 的数组 要运行管道的群集的规范数组。

此字段是可选的。

如果未指定此字段,系统将为管道选择默认群集配置。
PipelineLibrary 的数组 包含管道代码的笔记本和运行管道所需的任何依赖项。
目标 STRING 用于保存管道输出数据的数据库名称。

有关详细信息,请参阅增量实时表数据发布
continuous BOOLEAN 这是否为连续管道。

此字段是可选的。

默认值为 false
development BOOLEAN 是否在开发模式下运行管道。

此字段是可选的。

默认值为 false

PipelineStateInfo

管道的状态、最新更新的状态以及有关关联资源的信息。

字段名称 类型 说明
state STRING 管道的状态。 IDLERUNNING 中的一项。
pipeline_id STRING 管道的唯一标识符。
cluster_id STRING 运行管道的群集的唯一标识符。
name STRING 管道的用户友好名称。
latest_updates UpdateStateInfo 的数组 管道最近更新的状态,按从新到旧的更新顺序排序。
creator_user_name STRING 管道创建者的用户名。
run_as_user_name STRING 管道运行时使用的用户名。 这是从管道所有者派生的只读值。

PipelinesNewCluster

管道群集规范。

增量实时表系统设置以下属性。 用户无法配置以下属性:

  • spark_version
  • init_scripts
字段名称 类型 说明
label STRING 群集规范的标签,无论是
配置默认群集的 default,还是
配置维护群集的 maintenance

此字段是可选的。 默认值为 default
spark_conf KeyValue 一个对象,其中包含一组可选的由用户指定的 Spark 配置键值对。 你也可以分别通过以下属性,将额外 JVM 选项的字符串传入到驱动程序和执行程序:
spark.driver.extraJavaOptionsspark.executor.extraJavaOptions

示例 Spark 配置:
{"spark.speculation": true, "spark.streaming.ui.retainedBatches": 5}
{"spark.driver.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails"}
node_type_id STRING 此字段通过单个值对提供给此群集中的每个 Spark 节点的资源进行编码。 例如,可以针对内存密集型或计算密集型的工作负载来预配和优化 Spark 节点。通过使用列出节点类型 API 调用可以检索可用节点类型的列表。
driver_node_type_id STRING Spark 驱动程序的节点类型。 此字段为可选;如果未设置,驱动程序节点类型将会被设置为与上面定义的 node_type_id 相同的值。
ssh_public_keys 一个由 STRING 构成的数组 将会添加到此群集中各个 Spark 节点的 SSH 公钥内容。 对应的私钥可用于在端口 2200上使用用户名 ubuntu 登录。 最多可以指定 10 个密钥。
custom_tags KeyValue 一个对象,其中包含群集资源的一组标记。 Databricks 会使用这些标记以及 default_tags 来标记所有的群集资源。

注意

* 旧版节点类型(如计算优化和内存优化)不支持标记
* Azure Databricks 最多允许 45 个自定义标记。
cluster_log_conf ClusterLogConf 用于将 Spark 日志传递到长期存储目标的配置。 对于一个群集,只能指定一个目标。 如果提供此配置,日志将会发送到目标,发送的时间间隔为
5 mins. 驱动程序日志的目标是 <destination>/<cluster-ID>/driver,而执行程序日志的目标是 <destination>/<cluster-ID>/executor
spark_env_vars KeyValue 一个对象,其中包含一组可选的由用户指定的环境变量键值对。 在启动驱动程序和工作器时,(X,Y) 形式的键值对会按原样导出(即
export X='Y')。

要额外指定一组 SPARK_DAEMON_JAVA_OPTS,Databricks 建议将其追加到 $SPARK_DAEMON_JAVA_OPTS,如以下示例中所示。 这样就确保了还会包含所有默认的 Azure Databricks 托管环境变量。

Spark 环境变量示例:
{"SPARK_WORKER_MEMORY": "28000m", "SPARK_LOCAL_DIRS": "/local_disk0"}
{"SPARK_DAEMON_JAVA_OPTS": "$SPARK_DAEMON_JAVA_OPTS -Dspark.shuffle.service.enabled=true"}
init_scripts 一个由 InitScriptInfo 构成的数组 用于存储初始化脚本的配置。 可以指定任意数量的目标。 这些脚本会按照所提供的顺序依次执行。 如果指定了 cluster_log_conf,初始化脚本日志将会发送到
<destination>/<cluster-ID>/init_scripts.
instance_pool_id STRING 群集所属的实例池的可选 ID。 请参阅
driver_instance_pool_id STRING 要用于驱动程序节点的实例池的可选 ID。 另外还必须指定
instance_pool_id. 请参阅实例池 API 2.0
policy_id STRING 群集策略 ID。
num_workers 或 autoscale INT32INT32 如果是 num_workers,则此项为此群集应该具有的工作器节点数。 一个群集有一个 Spark 驱动程序和 num_workers 个执行程序用于总共 (num_workers + 1) 个 Spark 节点。

在读取群集的属性时,此字段反映的是所需的工作器数,而不是实际的工作器数。 例如,如果将群集的大小从 5 个工作器重设为 10 个工作器,此字段会更新,以反映 10 个工作器的目标大小,而执行程序中列出的工作器将会随着新节点的预配,逐渐从 5 个增加到 10 个。

如果是 autoscale,则会需要参数,以便根据负载自动纵向扩展或缩减群集。

此字段可选。
apply_policy_default_values BOOLEAN 是否对缺失的群集属性使用策略默认值。

UpdateStateInfo

管道更新的当前状态。

字段名称 类型 说明
update_id STRING 此更新的唯一标识符。
state STRING 更新的状态。 QUEUEDCREATED
WAITING_FOR_RESOURCES, INITIALIZING, RESETTING,
SETTING_UP_TABLES, RUNNING, STOPPING, COMPLETED,
FAILEDCANCELED 中的一项。
creation_time STRING 创建此更新时的时间戳。