Workflows des notebooks

La commande % Run vous permet d’inclure un autre bloc-notes dans un bloc-notes. Vous pouvez utiliser %run pour modulariser votre code, par exemple en plaçant des fonctions de prise en charge dans un autre bloc-notes. Vous pouvez également l’utiliser pour concaténer les blocs-notes qui implémentent les étapes d’une analyse. Lorsque vous utilisez %run , le bloc-notes appelé est immédiatement exécuté et les fonctions et les variables définies dans celui-ci sont disponibles dans le bloc-notes appelant.

Les flux de travail de bloc-notes sont un complément de %run , car ils vous permettent de transmettre des paramètres à et de retourner des valeurs à partir d’un bloc-notes. Cela vous permet de créer des flux de travail et des pipelines complexes avec des dépendances. Par exemple, vous pouvez obtenir une liste de fichiers dans un répertoire et transmettre les noms à un autre bloc-notes, ce qui n’est pas possible avec %run . Vous pouvez également créer des flux de travail if-then-else basés sur des valeurs de retour ou appeler d’autres blocs-notes en utilisant des chemins d’accès relatifs.

Pour implémenter des flux de travail de bloc-notes, utilisez les dbutils.notebook.* méthodes. Contrairement %run à, la dbutils.notebook.run() méthode démarre un nouveau travail pour exécuter le bloc-notes.

Ces méthodes, comme toutes les dbutils API, sont uniquement disponibles dans Python et Scala. Toutefois, vous pouvez utiliser dbutils.notebook.run() pour appeler un bloc-notes R.

Notes

Seules les tâches de workflow de bloc-notes dont la réalisation est inférieure ou égale à 30 jours sont prises en charge.

API

Les méthodes disponibles dans l' dbutils.notebook API pour créer des flux de travail de bloc-notes sont les suivantes : run et exit . Les paramètres et les valeurs de retour doivent être des chaînes.

run(path: String, timeout_seconds: int, arguments: Map): String

Exécutez un bloc-notes et retournez sa valeur de sortie. La méthode démarre un travail éphémère qui s’exécute immédiatement.

Le timeout_seconds paramètre contrôle le délai d’attente de l’exécution (0 signifie aucun délai d’expiration) : l’appel à run lève une exception s’il ne se termine pas dans le délai spécifié. Si Azure Databricks est en panne pendant plus de 10 minutes, l’exécution du Notebook échoue, quelle que soit la timeout_seconds .

Le arguments paramètre définit les valeurs des widgets du bloc-notes cible. Plus précisément, si le bloc-notes que vous exécutez possède un widget nommé A et que vous transmettez une paire clé-valeur dans le ("A": "B") paramètre arguments à l' run() appel, la récupération de la valeur du widget A retourne "B" . Vous trouverez les instructions relatives à la création et à l’utilisation des widgets dans l’article sur les widgets .

Avertissement

Le arguments paramètre accepte uniquement les caractères latins (jeu de caractères ASCII). L’utilisation de caractères non-ASCII retourne une erreur. Les caractères chinois, Kanji japonais et Emoji sont des exemples de caractères non-ASCII non valides.

run Utilisation

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run Exemple

Supposons que vous avez un bloc-notes nommé workflows avec un widget nommé foo qui imprime la valeur du widget :

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")

dbutils.notebook.run("workflows", 60, {"foo": "bar"})L’exécution produit le résultat suivant :

Notebook workflow with widget

Le widget avait la valeur que vous avez transmise via le workflow, "bar" , plutôt que la valeur par défaut.

exit(value: String): void Quitte un bloc-notes avec une valeur. Si vous appelez un bloc-notes à l’aide de la run méthode, il s’agit de la valeur retournée.

dbutils.notebook.exit("returnValue")

Si dbutils.notebook.exit vous appelez dans un travail, le bloc-notes se termine correctement. Si vous souhaitez provoquer l’échec du travail, levez une exception.

Exemple

Dans l’exemple suivant, vous transmettez des arguments à DataImportNotebook et vous exécutez différents blocs-notes ( DataCleaningNotebook ou ErrorHandlingNotebook ) en fonction du résultat de DataImportNotebook .

Notebook workflow

Lorsque le flux de travail du bloc-notes s’exécute, vous voyez un lien vers le bloc-notes en cours d’exécution :

Notebook workflow run

Cliquez sur le bloc-notes travail lien Notebook #xxxx pour afficher les détails de l’exécution :

Notebook workflow run result

Transmettre des données structurées

Cette section explique comment passer des données structurées entre des blocs-notes.

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.parquet(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

des erreurs

Cette section explique comment gérer les erreurs dans les flux de travail de bloc-notes.

Python

# Errors in workflows thrown a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Scala

// Errors in workflows thrown a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

Exécuter plusieurs notebooks simultanément

Vous pouvez exécuter plusieurs blocs-notes en même temps à l’aide de constructions Scala et Python standard, telles que des threads (scalaire, python) et futures (Scala, python). Les blocs-notes de flux de travail de bloc-notes avancés montrent comment utiliser ces constructions. Les blocs-notes sont dans Scala, mais vous pouvez facilement écrire l’équivalent dans Python. Pour exécuter l’exemple :

  1. Téléchargez l' Archive du bloc-notes.
  2. Importez l’archive dans un espace de travail.
  3. Exécutez le bloc-notes de bloc-notes simultané .