从另一个笔记本中运行 Databricks 笔记本

重要

对于笔记本业务流程,请使用 Databricks 作业。 对于代码模块化方案,请使用工作区文件。 当不能使用 Databricks 作业实现用例时(例如通过一组动态参数集循环笔记本),或者当你无权访问工作区文件时,应该只使用本文中所述的技术。 有关详细信息,请参阅 Databricks 作业共享代码

%rundbutils.notebook.run() 的比较

使用 %run 命令,可在笔记本中包含另一个笔记本。 可以使用 %run 来模块化代码,例如将支持函数放在单独的笔记本中。 你还可以使用它来连接用于实现分析中的步骤的笔记本。 使用 %run 时,被调用的笔记本会立即执行,其中定义的函数和变量在调用笔记本中变为可用。

dbutils.notebook API 是对 %run 的补充,因为它允许将参数传递给笔记本以及从笔记本返回值。 这使你可以生成包含依赖项的复杂工作流和管道。 例如,可以获取目录中的文件列表,并将名称传递给另一个笔记本,而使用 %run 则无法实现。 还可以根据返回值创建 if-then-else 工作流,或使用相对路径调用其他笔记本。

%run 不同,dbutils.notebook.run() 方法会启动一个新作业来运行笔记本。

这些方法(如所有 dbutils API)仅适用于 Python 和 Scala。 但可使用 dbutils.notebook.run() 调用 R 笔记本。

使用 %run 导入笔记本

在此示例中,第一个笔记本定义了函数 reverse,该函数在你使用 %run magic 执行 shared-code-notebook 后将在第二个笔记本中可用。

Shared code notebook

Notebook import example

因为这两个笔记本位于工作区的同一目录,所以在 ./shared-code-notebook 中使用前缀 ./ 来指示应相对于当前正在运行的笔记本来解析路径。 可以将笔记本组织到目录中,例如 %run ./dir/notebook,或使用 %run /Users/username@organization.com/directory/notebook 等绝对路径。

注意

  • %run 必须独自位于某个单元格中,因为它会以内联方式运行整个笔记本。
  • 不能使用 %run 来运行 Python 文件并将该文件中定义的实体 import 到笔记本中。 若要从 Python 文件导入,请参阅使用文件将代码模块化。 或者,将文件打包到 Python 库,从该 Python 库创建 Azure Databricks ,然后将库安装到用于运行笔记本的群集
  • 当使用 %run 运行包含小组件的笔记本时,默认情况下,指定的笔记本使用小部件的默认值运行。 还可以将值传入到小组件;请参阅将 Databricks 小组件与 %run 配合使用

dbutils.notebook API

dbutils.notebook API 中可用的方法为 runexit。 参数和返回值都必须是字符串。

run(path: String, timeout_seconds: int, arguments: Map): String

运行笔记本并返回其退出值。 该方法会启动一个立即运行的临时作业。

timeout_seconds 参数控制运行的超时值(0 表示无超时):如果对 run 的调用在指定时间内未完成,则会引发异常。 如果 Azure Databricks 停机时间超过 10 分钟,笔记本运行将失败,而不考虑 timeout_seconds

arguments 参数可设置目标笔记本的小组件值。 具体而言,如果正在运行的笔记本具有名为 A 的小组件,而且你将键值对 ("A": "B") 作为 arguments 参数的一部分传递给 run() 调用,则检索小组件 A 的值将返回 "B"。 可在 Databricks 小组件一文中找到有关创建和使用小组件的说明。

注意

  • arguments 参数只接受拉丁字符(ASCII 字符集)。 使用非 ASCII 字符会返回错误。
  • 使用 dbutils.notebook API 创建的作业必须在 30 天或更短时间内完成。

run 用法

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run 示例

假设你有一个名为 workflows 的笔记本,其中包含一个名为 foo,该笔记本将小组件值打印为:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))

运行 dbutils.notebook.run("workflows", 60, {"foo": "bar"}) 将产生以下结果:

Notebook with widget

该小组件具有使用 dbutils.notebook.run()"bar" 传入的值,而非默认值。

exit(value: String): void 使用值退出笔记本。 如果使用 run 方法调用笔记本,则会返回以下值。

dbutils.notebook.exit("returnValue")

在作业中调用 dbutils.notebook.exit 可导致笔记本成功完成。 如果希望作业失败,请引发异常。

示例

以下示例将 arguments 传递到 DataImportNotebook 并根据 DataImportNotebook 的结果运行不同的笔记本(DataCleaningNotebookErrorHandlingNotebook)。

if-else example

运行代码时,你会看到一个指向正在运行的笔记本的链接:

Link to running notebook

若要查看运行详细信息,请单击笔记本链接“笔记本作业 #xxxx”。

Result of ephemeral notebook run

传递结构化数据

本部分说明如何在笔记本之间传递结构化数据。

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
import json

result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

处理错误

本部分说明如何处理错误。

Python

# Errors throw a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Scala

// Errors throw a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

同时运行多个笔记本

可使用标准 Scala 和 Python 构造同时运行多个笔记本,如 Thread(ScalaPython)和 Future(ScalaPython)。 示例笔记本演示了如何使用这些构造。

  1. 下载以下 4 个笔记本。 这些笔记本是以 Scala 编写的。
  2. 将笔记本导入工作区中的单个文件夹。
  3. 运行“Run concurrently”笔记本。

“Run concurrently”笔记本

获取笔记本

“Run in parallel”笔记本

获取笔记本

“Testing”笔记本

获取笔记本

“Testing-2”笔记本

获取笔记本