Notebook 워크플로

%run 명령을 사용하면 Notebook 내에 다른 Notebook을 포함할 수 있습니다. 예를 들어 %run을 사용해 지원 함수를 별도의 Notebook에 배치하여 코드를 모듈화할 수 있습니다. 이를 통해 분석의 단계를 구현하는 Notebook을 연결할 수도 있습니다. %run을 사용하면 호출된 Notebook이 즉시 실행되고 그 안에 정의된 함수와 변수를 호출 Notebook에서 사용할 수 있게 됩니다.

Notebook 워크플로는 Notebook에 매개 변수를 전달하고 Notebook에서 값을 반환할 수 있기 때문에 %run을 보완합니다. 이렇게 하면 종속성이 있는 복잡한 워크플로 및 파이프라인을 빌드할 수 있습니다. 예를 들어 디렉터리에 있는 파일 목록을 가져와서 다른 Notebook에 이름을 전달할 수 있습니다. 이 경우에는 %run을 사용할 수 없습니다. 반환 값을 기반으로 if-then-else 워크플로를 만들거나 상대 경로를 사용하여 다른 Notebook을 호출할 수도 있습니다.

Notebook 워크플로를 구현하려면 dbutils.notebook.* 메서드를 사용합니다. %run과 달리 dbutils.notebook.run() 메서드는 Notebook을 실행하는 새 작업을 시작합니다.

이러한 메서드는 모든 dbutils API와 마찬가지로 Python 및 Scala에서만 사용할 수 있습니다. 그러나 dbutils.notebook.run()을 사용하여 R Notebook을 호출할 수 있습니다.

참고

완료하는 데 30일 이하가 걸리는 Notebook 워크플로 작업만 지원됩니다.

API

dbutils.notebook API에서 Notebook 워크플로를 빌드하는 데 사용할 수 있는 메서드는 runexit입니다. 매개 변수와 반환 값은 모두 문자열이어야 합니다.

run(path: String, timeout_seconds: int, arguments: Map): String

Notebook을 실행하고 해당 종료 값을 반환합니다. 메서드는 즉시 실행되는 임시 작업을 시작합니다.

timeout_seconds 매개 변수는 실행의 시간 제한을 제어합니다(0은 시간 초과 없음을 의미). run에 대한 호출은 지정된 시간 내에 완료되지 않으면 예외를 throw입니다. Azure Databricks가 10분을 초과하여 다운된 경우 Notebook 실행은 timeout_seconds에 관계없이 실패합니다.

arguments 매개 변수는 대상 Notebook의 위젯 값을 설정합니다. 특히, 실행 중인 Notebook에 A라는 위젯이 있고 run() 호출에 대한 인수 매개 변수의 일부로 키-값 쌍 ("A": "B")를 전달하는 경우 위젯 A의 값을 검색하면 "B"가 반환됩니다. 위젯 문서에서 위젯을 만들고 작업하기 위한 지침을 찾을 수 있습니다.

경고

arguments 매개 변수는 라틴 문자(ASCII 문자 집합)만 허용합니다. ASCII가 아닌 문자를 사용하면 오류가 반환됩니다. ASCII가 아닌 잘못된 문자의 예로 중국어, 일본어 간지 및 이모지가 있습니다.

run 사용 방법

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run

위젯의 값을 인쇄하는 foo라는 위젯이 있는 workflows라는 Notebook이 있다고 가정합니다.

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")

dbutils.notebook.run("workflows", 60, {"foo": "bar"})를 실행하면 다음 결과가 생성됩니다.

Notebook workflow with widget

위젯에는 기본값이 아닌 워크플로를 통해 전달한 값인 "bar"가 있습니다.

exit(value: String): void 값이 있는 Notebook을 종료합니다. run 메서드를 사용하여 Notebook을 호출하는 경우 반환되는 값입니다.

dbutils.notebook.exit("returnValue")

작업에서 dbutils.notebook.exit를 호출하면 Notebook이 성공적으로 완료됩니다. 작업이 실패하도록 하려면 예외를 throw합니다.

예제

다음 예제에서는 DataImportNotebook에 인수를 전달하고 DataImportNotebook의 결과에 따라 다른 Notebook(DataCleaningNotebook 또는 ErrorHandlingNotebook)을 실행합니다.

Notebook workflow

Notebook 워크플로가 실행되면 실행 중인 Notebook에 대한 링크가 표시됩니다.

Notebook workflow run

Notebook 링크 Notebook 작업 #xxxx를 클릭하여 실행 세부 정보를 봅니다.

Notebook workflow run result

구조화된 데이터 전달

이 섹션에서는 Notebook 간에 구조화된 데이터를 전달하는 방법을 보여 줍니다.

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").load("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

오류 처리

이 섹션에서는 Notebook 워크플로에서 오류를 처리하는 방법을 보여 줍니다.

Python

# Errors in workflows thrown a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Scala

// Errors in workflows thrown a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

동시에 여러 Notebook 실행

스레드(Scala, Python) 및 Futures(Scala, Python)와 같은 표준 Scala 및 Python 구문을 사용하여 동시에 여러 Notebook을 실행할 수 있습니다. 고급 Notebook 워크플로 Notebooks는 이러한 구문을 사용하는 방법을 보여 줍니다. Notebooks는 Scala에 있지만 Python에서 해당 항목을 쉽게 작성할 수 있습니다. 예제를 실행하려면 다음을 수행합니다.

  1. Notebook 보관 파일을 다운로드합니다.
  2. 보관 파일을 작업 영역으로 가져옵니다.
  3. 동시 Notebooks Notebook을 실행합니다.