增量实时表 API 指南
增量实时表 API 让你能够创建、编辑、删除、启动和查看管道相关的详细信息。
重要
要访问 Databricks REST API,必须进行身份验证。
创建管道
端点 | HTTP 方法 |
---|---|
2.0/pipelines |
POST |
创建新的增量实时表管道。
示例
此示例可创建新的触发管道。
请求
curl --netrc --request POST \
https://<databricks-instance>/api/2.0/pipelines \
--data @pipeline-settings.json
pipeline-settings.json
:
{
"name": "Wikipedia pipeline (SQL)",
"storage": "/Users/username/data",
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"libraries": [
{
"notebook": {
"path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
}
}
],
"continuous": false
}
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
响应
{
"pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5"
}
请求结构
请参阅 PipelineSettings。
响应结构
字段名称 | 类型 | 说明 |
---|---|---|
pipeline_id | STRING |
新创建的管道的唯一标识符。 |
编辑管道
端点 | HTTP 方法 |
---|---|
2.0/pipelines/{pipeline_id} |
PUT |
更新现有管道的设置。
示例
此示例将 target
参数添加到 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
的管道:
请求
curl --netrc --request PUT \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 \
> --data @pipeline-settings.json
pipeline-settings.json
{
"id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
"name": "Wikipedia pipeline (SQL)",
"storage": "/Users/username/data",
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"libraries": [
{
"notebook": {
"path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
}
}
],
"target": "wikipedia_quickstart_data",
"continuous": false
}
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
请求结构
请参阅 PipelineSettings。
删除管道
端点 | HTTP 方法 |
---|---|
2.0/pipelines/{pipeline_id} |
DELETE |
从增量实时表系统中删除管道。
示例
此示例删除 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
的管道:
请求
curl --netrc --request DELETE \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
启动管道更新
端点 | HTTP 方法 |
---|---|
2.0/pipelines/{pipeline_id}/updates |
POST |
启动管道的更新。
示例
此示例使用完全刷新开始更新 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
的管道:
请求
curl --netrc --request POST \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/updates \
--data '{ "full_refresh": "true" }'
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
响应
{
"update_id": "a1b23c4d-5e6f-78gh-91i2-3j4k5lm67no8"
}
请求结构
字段名称 | 类型 | 说明 |
---|---|---|
full_refresh | BOOLEAN |
是否重新处理所有数据。 如果为 true ,则增量实时表系统将在运行管道之前重置所有表。此字段是可选的。 默认值为 false |
响应结构
字段名称 | 类型 | 说明 |
---|---|---|
update_id | STRING |
新创建的更新的唯一标识符。 |
停止任何活动管道更新
端点 | HTTP 方法 |
---|---|
2.0/pipelines/{pipeline_id}/stop |
POST |
停止任何活动管道更新。 如果没有正在运行的更新,则此请求为无操作。
示例
此示例停止更新 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
的管道:
请求
curl --netrc --request POST \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/stop
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
列出管道事件
端点 | HTTP 方法 |
---|---|
2.0/pipelines/{pipeline_id}/events |
GET |
检索管道的事件。
示例
此示例最多检索 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
的管道的 5 个事件。
请求
curl -n -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/events \
--data '{"max_results": 5}'
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
请求结构
字段名称 | 类型 | 说明 |
---|---|---|
page_token | STRING |
先前调用返回的页面令牌。 此字段与此请求中除 max_results 以外的所有字段互斥。 如果在设置此字段时设置了除 max_results 以外的任何字段,则会返回错误。 此字段是可选的。 |
max_results | INT32 |
要在单个页面中返回的最大条目数。 即使有更多的可用事件,系统在响应中返回的事件也可能少于 max_results 。此字段是可选的。 默认值为 25。 最大值为 100。 如果 max_results 的值大于 100,则返回错误。 |
order_by | STRING |
一个指示结果的排序顺序的字符串(例如,["timestamp asc"] )。排序顺序可以是升序,也可以是降序。 默认情况下,事件按时间戳的降序返回。 此字段是可选的。 |
filter | STRING |
用于选择结果子集的条件,使用类似于 SQL 的语法表示。 支持的筛选器包括: * level='INFO' (或 WARN 或 ERROR )* level in ('INFO', 'WARN') * id='[event-id]' * timestamp > 'TIMESTAMP' (或 >= 、< 、<= 、= )支持复合表达式,例如: level in ('ERROR', 'WARN') AND timestamp> '2021-07-22T06:37:33.083Z' 此字段是可选的。 |
响应结构
字段名称 | 类型 | 说明 |
---|---|---|
活动 | 管道事件的数组。 | 匹配请求条件的事件列表。 |
next_page_token | STRING |
如果存在,则为用于获取下一页事件的令牌。 |
prev_page_token | STRING |
如果存在,则为用于获取上一页事件的令牌。 |
获取管道详细信息
端点 | HTTP 方法 |
---|---|
2.0/pipelines/{pipeline_id} |
GET |
获取有关管道的详细信息,包括管道设置和最新更新。
示例
此示例将获取 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
的管道的详细信息:
请求
curl -n -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
响应
{
"pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
"spec": {
"id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
"name": "Wikipedia pipeline (SQL)",
"storage": "/Users/username/data",
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"libraries": [
{
"notebook": {
"path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
}
}
],
"target": "wikipedia_quickstart_data",
"continuous": false
},
"state": "IDLE",
"cluster_id": "1234-567891-abcde123",
"name": "Wikipedia pipeline (SQL)",
"creator_user_name": "username",
"latest_updates": [
{
"update_id": "8a0b6d02-fbd0-11eb-9a03-0242ac130003",
"state": "COMPLETED",
"creation_time": "2021-08-13T00:37:30.279Z"
},
{
"update_id": "a72c08ba-fbd0-11eb-9a03-0242ac130003",
"state": "CANCELED",
"creation_time": "2021-08-13T00:35:51.902Z"
},
{
"update_id": "ac37d924-fbd0-11eb-9a03-0242ac130003",
"state": "FAILED",
"creation_time": "2021-08-13T00:33:38.565Z"
}
],
"run_as_user_name": "username"
}
响应结构
字段名称 | 类型 | 说明 |
---|---|---|
pipeline_id | STRING |
管道的唯一标识符。 |
spec | PipelineSettings | 管道设置。 |
state | STRING |
管道的状态。 IDLE 或 RUNNING 中的一项。如果 state = RUNNING ,则至少有一个活动的更新。 |
cluster_id | STRING |
运行管道的群集的标识符。 |
name | STRING |
此管道的用户友好名称。 |
creator_user_name | STRING |
管道创建者的用户名。 |
latest_updates | UpdateStateInfo 的数组 | 管道最近更新的状态,按从新到旧的更新顺序排序。 |
run_as_user_name | STRING |
管道运行时使用的用户名。 |
获取更新详细信息
端点 | HTTP 方法 |
---|---|
2.0/pipelines/{pipeline_id}/updates/{update_id} |
GET |
获取管道更新的详细信息。
示例
此示例将获取 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5
的管道更新 9a84f906-fc51-11eb-9a03-0242ac130003
的详细信息:
请求
curl -n -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/updates/9a84f906-fc51-11eb-9a03-0242ac130003
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
响应
{
"update": {
"pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
"update_id": "9a84f906-fc51-11eb-9a03-0242ac130003",
"config": {
"id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
"name": "Wikipedia pipeline (SQL)",
"storage": "/Users/username/data",
"configuration": {
"pipelines.numStreamRetryAttempts": "5"
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"libraries": [
{
"notebook": {
"path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
}
}
],
"target": "wikipedia_quickstart_data",
"filters": {},
"email_notifications": {},
"continuous": false,
"development": false
},
"cause": "API_CALL",
"state": "COMPLETED",
"creation_time": 1628815050279,
"full_refresh": true
}
}
响应结构
字段名称 | 类型 | 说明 |
---|---|---|
pipeline_id | STRING |
管道的唯一标识符。 |
update_id | STRING |
此更新的唯一标识符。 |
config | PipelineSettings | 管道设置。 |
cause | STRING |
更新的触发器。 API_CALL 、RETRY_ON_FAILURE , SERVICE_UPGRADE . |
state | STRING |
更新的状态。 QUEUED 、CREATED 、WAITING_FOR_RESOURCES , INITIALIZING , RESETTING ,SETTING_UP_TABLES , RUNNING , STOPPING , COMPLETED ,FAILED 或 CANCELED 中的一项。 |
cluster_id | STRING |
运行管道的群集的标识符。 |
creation_time | INT64 |
创建更新时的时间戳。 |
full_refresh | BOOLEAN |
是否触发了更新以执行完全刷新。 如果是,则在运行更新之前重置所有管道表。 |
列出管道
端点 | HTTP 方法 |
---|---|
2.0/pipelines/ |
GET |
列出增量实时表系统中定义的管道。
示例
此示例从指定的 page_token
开始,最多检索两个管道的详细信息:
请求
curl -n -X GET https://<databricks-instance>/api/2.0/pipelines \
--data '{ "page_token": "eyJ...==", "max_results": 2 }'
将:
<databricks-instance>
替换为 Azure Databricks<databricks-instance>
(例如adb-1234567890123456.7.azuredatabricks.net
)。
此示例使用 .netrc 文件。
响应
{
"statuses": [
{
"pipeline_id": "e0f01758-fc61-11eb-9a03-0242ac130003",
"state": "IDLE",
"name": "dlt-pipeline-python",
"latest_updates": [
{
"update_id": "ee9ae73e-fc61-11eb-9a03-0242ac130003",
"state": "COMPLETED",
"creation_time": "2021-08-13T00:34:21.871Z"
}
],
"creator_user_name": "username"
},
{
"pipeline_id": "f4c82f5e-fc61-11eb-9a03-0242ac130003",
"state": "IDLE",
"name": "dlt-pipeline-python",
"creator_user_name": "username"
}
],
"next_page_token": "eyJ...==",
"prev_page_token": "eyJ..x9"
}
请求结构
字段名称 | 类型 | 说明 |
---|---|---|
page_token | STRING |
先前调用返回的页面令牌。 此字段是可选的。 |
max_results | INT32 |
要在单个页面中返回的最大条目数。 即使有更多的可用事件,系统在响应中返回的事件也可能少于 max_results 。此字段是可选的。 默认值为 25。 最大值为 100。 如果 max_results 的值大于 100,则返回错误。 |
order_by | 一个由 STRING 构成的数组 |
指定结果顺序的字符串列表,例如,["name asc"] . 支持的 order_by 字段为 id 和name . 默认值为 id asc 。此字段是可选的。 |
filter | STRING |
根据指定条件选择结果的子集。 支持的筛选器包括: "notebook='<path>'" ,用以选择引用所提供笔记本路径的管道。name LIKE '[pattern]' ,用以选择名称与 pattern 匹配的管道。 支持通配符,例如:name LIKE '%shopping%' 不支持复合筛选器。 此字段是可选的。 |
响应结构
字段名称 | 类型 | 描述 |
---|---|---|
statuses | PipelineStateInfo 的数组 | 匹配请求条件的事件列表。 |
next_page_token | STRING |
如果存在,则为用于获取下一页事件的令牌。 |
prev_page_token | STRING |
如果存在,则为用于获取上一页事件的令牌。 |
数据结构
本节内容:
- KeyValue
- NotebookLibrary
- PipelineLibrary
- PipelineSettings
- PipelineStateInfo
- PipelinesNewCluster
- UpdateStateInfo
KeyValue
指定配置参数的键值对。
字段名称 | 类型 | 描述 |
---|---|---|
key | STRING |
配置属性名称。 |
value | STRING |
配置属性值。 |
NotebookLibrary
包含管道代码的笔记本的规范。
字段名称 | 类型 | 描述 |
---|---|---|
path | STRING |
笔记本的绝对路径。 此字段为必需字段。 |
PipelineLibrary
管道依赖项的规范。
字段名称 | 类型 | 说明 |
---|---|---|
笔记本 | NotebookLibrary | 定义增量实时表数据集的笔记本的路径。 路径必须在 Databricks 工作区中,例如:{ "notebook" : { "path" : "/my-pipeline-notebook-path" } } . |
PipelineSettings
管道部署的设置。
字段名称 | 类型 | 说明 |
---|---|---|
id | STRING |
此管道的唯一标识符。 标识符是由增量实时表系统创建的,并且在创建管道时不得提供。 |
name | STRING |
此管道的用户友好名称。 此字段是可选的。 默认情况下,管道名称必须唯一。 若要使用重复的名称,请在管道配置中将 allow_duplicate_names 设置为 true 。 |
存储 | STRING |
用于存储管道创建的检查点和表的 DBFS 目录的路径。 此字段是可选的。 如果此字段为空,则系统将使用默认位置。 |
配置 | STRING:STRING 的映射 |
要添加到将运行管道的群集的 Spark 配置中的键值对的列表。 此字段是可选的。 必须将元素设置为“键:值”对的格式。 |
clusters | PipelinesNewCluster 的数组 | 要运行管道的群集的规范数组。 此字段是可选的。 如果未指定此字段,系统将为管道选择默认群集配置。 |
库 | PipelineLibrary 的数组 | 包含管道代码的笔记本和运行管道所需的任何依赖项。 |
目标 | STRING |
用于保存管道输出数据的数据库名称。 有关详细信息,请参阅增量实时表数据发布。 |
continuous | BOOLEAN |
这是否为连续管道。 此字段是可选的。 默认值为 false 。 |
development | BOOLEAN |
是否在开发模式下运行管道。 此字段是可选的。 默认值为 false 。 |
PipelineStateInfo
管道的状态、最新更新的状态以及有关关联资源的信息。
字段名称 | 类型 | 说明 |
---|---|---|
state | STRING |
管道的状态。 IDLE 或 RUNNING 中的一项。 |
pipeline_id | STRING |
管道的唯一标识符。 |
cluster_id | STRING |
运行管道的群集的唯一标识符。 |
name | STRING |
管道的用户友好名称。 |
latest_updates | UpdateStateInfo 的数组 | 管道最近更新的状态,按从新到旧的更新顺序排序。 |
creator_user_name | STRING |
管道创建者的用户名。 |
run_as_user_name | STRING |
管道运行时使用的用户名。 这是从管道所有者派生的只读值。 |
PipelinesNewCluster
管道群集规范。
增量实时表系统设置以下属性。 用户无法配置以下属性:
spark_version
init_scripts
字段名称 | 类型 | 说明 |
---|---|---|
label | STRING |
群集规范的标签,无论是 配置默认群集的 default ,还是配置维护群集的 maintenance 。此字段是可选的。 默认值为 default 。 |
spark_conf | KeyValue | 一个对象,其中包含一组可选的由用户指定的 Spark 配置键值对。 你也可以分别通过以下属性,将额外 JVM 选项的字符串传入到驱动程序和执行程序:spark.driver.extraJavaOptions 和 spark.executor.extraJavaOptions 。示例 Spark 配置: {"spark.speculation": true, "spark.streaming.ui.retainedBatches": 5} 或{"spark.driver.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails"} |
node_type_id | STRING |
此字段通过单个值对提供给此群集中的每个 Spark 节点的资源进行编码。 例如,可以针对内存密集型或计算密集型的工作负载来预配和优化 Spark 节点。通过使用列出节点类型 API 调用可以检索可用节点类型的列表。 |
driver_node_type_id | STRING |
Spark 驱动程序的节点类型。 此字段为可选;如果未设置,驱动程序节点类型将会被设置为与上面定义的 node_type_id 相同的值。 |
ssh_public_keys | 一个由 STRING 构成的数组 |
将会添加到此群集中各个 Spark 节点的 SSH 公钥内容。 对应的私钥可用于在端口 2200 上使用用户名 ubuntu 登录。 最多可以指定 10 个密钥。 |
custom_tags | KeyValue | 一个对象,其中包含群集资源的一组标记。 Databricks 会使用这些标记以及 default_tags 来标记所有的群集资源。 注意: * 旧版节点类型(如计算优化和内存优化)不支持标记 * Azure Databricks 最多允许 45 个自定义标记。 |
cluster_log_conf | ClusterLogConf | 用于将 Spark 日志传递到长期存储目标的配置。 对于一个群集,只能指定一个目标。 如果提供此配置,日志将会发送到目标,发送的时间间隔为5 mins . 驱动程序日志的目标是 <destination>/<cluster-ID>/driver ,而执行程序日志的目标是 <destination>/<cluster-ID>/executor 。 |
spark_env_vars | KeyValue | 一个对象,其中包含一组可选的由用户指定的环境变量键值对。 在启动驱动程序和工作器时,(X,Y) 形式的键值对会按原样导出(即export X='Y' )。要额外指定一组 SPARK_DAEMON_JAVA_OPTS ,Databricks 建议将其追加到 $SPARK_DAEMON_JAVA_OPTS ,如以下示例中所示。 这样就确保了还会包含所有默认的 Azure Databricks 托管环境变量。Spark 环境变量示例: {"SPARK_WORKER_MEMORY": "28000m", "SPARK_LOCAL_DIRS": "/local_disk0"} 或{"SPARK_DAEMON_JAVA_OPTS": "$SPARK_DAEMON_JAVA_OPTS -Dspark.shuffle.service.enabled=true"} |
init_scripts | 一个由 InitScriptInfo 构成的数组 | 用于存储初始化脚本的配置。 可以指定任意数量的目标。 这些脚本会按照所提供的顺序依次执行。 如果指定了 cluster_log_conf ,初始化脚本日志将会发送到<destination>/<cluster-ID>/init_scripts . |
instance_pool_id | STRING |
群集所属的实例池的可选 ID。 请参阅池。 |
driver_instance_pool_id | STRING |
要用于驱动程序节点的实例池的可选 ID。 另外还必须指定instance_pool_id . 请参阅实例池 API 2.0。 |
policy_id | STRING |
群集策略 ID。 |
num_workers 或 autoscale | INT32 或 INT32 |
如果是 num_workers,则此项为此群集应该具有的工作器节点数。 一个群集有一个 Spark 驱动程序和 num_workers 个执行程序用于总共 (num_workers + 1) 个 Spark 节点。 在读取群集的属性时,此字段反映的是所需的工作器数,而不是实际的工作器数。 例如,如果将群集的大小从 5 个工作器重设为 10 个工作器,此字段会更新,以反映 10 个工作器的目标大小,而执行程序中列出的工作器将会随着新节点的预配,逐渐从 5 个增加到 10 个。 如果是 autoscale,则会需要参数,以便根据负载自动纵向扩展或缩减群集。 此字段可选。 |
apply_policy_default_values | BOOLEAN |
是否对缺失的群集属性使用策略默认值。 |
UpdateStateInfo
管道更新的当前状态。
字段名称 | 类型 | 说明 |
---|---|---|
update_id | STRING |
此更新的唯一标识符。 |
state | STRING |
更新的状态。 QUEUED 、CREATED 、WAITING_FOR_RESOURCES , INITIALIZING , RESETTING ,SETTING_UP_TABLES , RUNNING , STOPPING , COMPLETED ,FAILED 或 CANCELED 中的一项。 |
creation_time | STRING |
创建此更新时的时间戳。 |