Executar um bloco de notas Databricks a partir de outro bloco de notas

Importante

Para orquestração de blocos de anotações, use Databricks Jobs. Para cenários de modularização de código, use arquivos de espaço de trabalho. Você só deve usar as técnicas descritas neste artigo quando seu caso de uso não puder ser implementado usando um trabalho Databricks, como para looping de blocos de anotações em um conjunto dinâmico de parâmetros ou se você não tiver acesso a arquivos de espaço de trabalho. Para obter mais informações, consulte Databricks Jobs e compartilhar código.

Comparação de %run e dbutils.notebook.run()

O %run comando permite incluir outro bloco de notas num bloco de notas. Você pode usar %run para modularizar seu código, por exemplo, colocando funções de suporte em um bloco de anotações separado. Você também pode usá-lo para concatenar blocos de anotações que implementam as etapas em uma análise. Quando você usa %runo , o bloco de anotações chamado é imediatamente executado e as funções e variáveis nele definidas ficam disponíveis no bloco de anotações de chamada.

A dbutils.notebook API é um complemento porque %run permite passar parâmetros e retornar valores de um bloco de anotações. Isso permite que você crie fluxos de trabalho e pipelines complexos com dependências. Por exemplo, você pode obter uma lista de arquivos em um diretório e passar os nomes para outro bloco de anotações, o que não é possível com %runo . Você também pode criar fluxos de trabalho if-then-else com base em valores de retorno ou chamar outros blocos de anotações usando caminhos relativos.

Ao contrário %rundo , o dbutils.notebook.run() método inicia um novo trabalho para executar o bloco de anotações.

Esses métodos, como todas as dbutils APIs, estão disponíveis apenas em Python e Scala. No entanto, você pode usar dbutils.notebook.run() para invocar um bloco de anotações R.

Utilizar %run para importar um bloco de notas

Neste exemplo, o primeiro bloco de notas define uma função, reverse, que está disponível no segundo bloco de notas depois de utilizar a %run magia para executar shared-code-notebook.

Bloco de notas de código partilhado

Exemplo de importação de bloco de notas

Como ambos os blocos de anotações estão no mesmo diretório no espaço de trabalho, use o prefixo ./ in ./shared-code-notebook para indicar que o caminho deve ser resolvido em relação ao bloco de anotações em execução no momento. Você pode organizar blocos de anotações em diretórios, como %run ./dir/notebook, ou usar um caminho absoluto como %run /Users/username@organization.com/directory/notebook.

Nota

  • %run deve estar em uma célula por si só, porque ele executa todo o bloco de anotações em linha.
  • Você não pode usar %run para executar um arquivo Python e import as entidades definidas nesse arquivo em um bloco de anotações. Para importar de um arquivo Python, consulte Modularizar seu código usando arquivos. Ou empacote o arquivo em uma biblioteca Python, crie uma biblioteca do Azure Databricks a partir dessa biblioteca Python e instale a biblioteca no cluster que você usa para executar seu bloco de anotações.
  • Quando você usa %run para executar um bloco de anotações que contém widgets, por padrão, o bloco de anotações especificado é executado com os valores padrão do widget. Você também pode passar valores para widgets; consulte Usar widgets Databricks com %run.

dbutils.notebook API

Os métodos disponíveis na dbutils.notebook API são run e exit. Ambos os parâmetros e valores de retorno devem ser strings.

run(path: String, timeout_seconds: int, arguments: Map): String

Execute um bloco de anotações e retorne seu valor de saída. O método inicia um trabalho efêmero que é executado imediatamente.

O timeout_seconds parâmetro controla o tempo limite da execução (0 significa que não há tempo limite): a chamada para run lançar uma exceção se ela não terminar dentro do tempo especificado. Se o Azure Databricks estiver inativo por mais de 10 minutos, a execução do bloco de anotações falhará independentemente do timeout_seconds.

O arguments parâmetro define os valores do widget do bloco de anotações de destino. Especificamente, se o bloco de anotações que você está executando tiver um widget chamado A, e você passar um par ("A": "B") chave-valor como parte do parâmetro arguments para a run() chamada, a recuperação do valor do widget A retornará "B". Você pode encontrar as instruções para criar e trabalhar com widgets no artigo Widgets Databricks .

Nota

  • O arguments parâmetro aceita apenas caracteres latinos (conjunto de caracteres ASCII). O uso de caracteres não-ASCII retorna um erro.
  • Os trabalhos criados usando a API devem ser concluídos dbutils.notebook em 30 dias ou menos.

run Utilização

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run Exemplo

Suponha que você tenha um bloco de anotações nomeado workflows com um widget nomeado foo que imprime o valor do widget:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))

A execução dbutils.notebook.run("workflows", 60, {"foo": "bar"}) produz o seguinte resultado:

Notebook com widget

O widget tinha o valor que você passou usando dbutils.notebook.run(), "bar", em vez do padrão.

exit(value: String): void Saia de um bloco de notas com um valor. Se você chamar um bloco de anotações usando o run método, este é o valor retornado.

dbutils.notebook.exit("returnValue")

Chamar dbutils.notebook.exit um trabalho faz com que o bloco de anotações seja concluído com êxito. Se você quiser fazer com que o trabalho falhe, lance uma exceção.

Exemplo

No exemplo a seguir, você passa argumentos e DataImportNotebook executa blocos de anotações diferentes (DataCleaningNotebook ou ErrorHandlingNotebook) com base no resultado de DataImportNotebook.

Exemplo if-else

Quando o código é executado, você vê um link para o bloco de anotações em execução:

Vincular ao bloco de anotações em execução

Para exibir os detalhes da execução, clique no link do bloco de anotações Trabalho do bloco de anotações #xxxx.

Resultado da execução efêmera do notebook

Transmitir dados estruturados

Esta seção ilustra como passar dados estruturados entre blocos de anotações.

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
import json

result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

Processar erros

Esta seção ilustra como lidar com erros.

Python

# Errors throw a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Scala

// Errors throw a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

Executar vários blocos de notas em simultâneo

Você pode executar vários notebooks ao mesmo tempo usando construções padrão Scala e Python, como Threads (Scala, Python) e Futures (Scala, Python). Os blocos de anotações de exemplo demonstram como usar essas construções.

  1. Transfira os 4 blocos de notas seguintes. Os cadernos estão escritos em Scala.
  2. Importe os blocos de anotações para uma única pasta no espaço de trabalho.
  3. Execute o bloco de anotações Executar simultaneamente .

Executar bloco de anotações simultaneamente

Obter o bloco de notas

Executar em bloco de notas paralelo

Obter o bloco de notas

Caderno de teste

Obter o bloco de notas

Caderno de teste-2

Obter o bloco de notas