Arbetsflöden för notebook-filer

Med kommandot % Run kan du inkludera en annan antecknings bok i en bärbar dator. Med det här kommandot kan du sammanfoga olika antecknings böcker som representerar inetl-steg, Spark-analys steg eller ad hoc-utforskning. Det saknar dock möjlighet att bygga mer komplexa datapipeliner.

Notebook-arbetsflöden är ett komplement till %run eftersom de låter dig returnera värden från en antecknings bok. På så sätt kan du enkelt bygga komplexa arbets flöden och pipeliner med beroenden. Du kan Parameterisera körningar på rätt sätt (till exempel hämta en lista över filer i en katalog och skicka namnen till en annan antecknings bok, något som inte är möjligt med %run ) och även skapa om/sedan/Else-arbetsflöden baserat på RETUR värden. Med Notebook-arbetsflöden kan du anropa andra antecknings böcker via relativa sökvägar.

Du implementerar Notebook-arbetsflöden med dbutils.notebook metoder. Dessa metoder, som alla API: dbutils er, är bara tillgängliga i Scala och python. Du kan dock använda dbutils.notebook.run för att anropa en R-anteckningsbok.

Anteckning

Tids krävande arbets flödes jobb som tar mer än 48 timmar att slutföra stöds inte.

API

De metoder som är tillgängliga i dbutils.notebook API för att bygga antecknings böcker för bärbara datorer är: run och exit . Både parametrar och retur värden måste vara strängar.

run(path: String, timeout_seconds: int, arguments: Map): String

Kör en antecknings bok och returnera dess slut värde. Metoden startar ett tillfälligt jobb som körs omedelbart.

timeout_secondsParametern styr tids gränsen för körningen (0 betyder ingen tids gräns): anropet run returnerar ett undantag om det inte slutförs inom den angivna tiden. Om Azure Databricks är nere i mer än 10 minuter, fungerar inte Notebook-körningen oavsett timeout_seconds .

argumentsParametern anger widget svärdet för mål antecknings boken. Mer specifikt, om den antecknings bok som du kör har en widget med namnet A och du skickar ett nyckel/värde-par ("A": "B") som en del av parametern arguments till run() anropet, kommer hämtningen av widgeten att A returneras "B" . Du hittar anvisningar för att skapa och arbeta med widgetar i artikeln med widgetar .

Varning

argumentsParametern accepterar endast latinska tecken (ASCII-teckenuppsättning). Om icke-ASCII-tecken används returneras ett fel. Exempel på ogiltiga, icke-ASCII-tecken är kinesiska, japanska kanji och emojis.

run Användningsvyn

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run Exempel

Anta att du har en bärbar dator med namnet workflows med en widget som heter och foo som skriver ut widgetens värde:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")

Körningen dbutils.notebook.run("workflows", 60, {"foo": "bar"}) genererar följande resultat:

Anteckningsbok-arbetsflöde med widget

Widgeten hade det värde som du skickade genom arbets flödet, "bar" i stället för standardvärdet.

exit(value: String): void Avsluta en antecknings bok med ett värde. Om du anropar en antecknings bok med run -metoden returneras värdet.

dbutils.notebook.exit("returnValue")

Genom dbutils.notebook.exit att anropa i ett jobb slutförs antecknings boken. Utlös ett undantag om du vill orsaka att jobbet inte fungerar.

Exempel

I följande exempel skickar du argument till DataImportNotebook och kör olika antecknings böcker ( DataCleaningNotebook eller ErrorHandlingNotebook ) baserat på resultatet från DataImportNotebook .

Anteckningsbok-arbetsflöde

När arbets flödet för antecknings boken körs visas en länk till den bärbara datorn som körs:

Arbets flöde för notebook-körning

Klicka på arbets antecknings boken för notebook-anteckningsbok #xxxx om du vill visa information om körningen:

Kör resultat för notebook-arbetsflöde

Skicka strukturerade data

I det här avsnittet beskrivs hur du skickar strukturerade data mellan antecknings böcker.

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
sqlContext.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
sqlContext.range(5).toDF("value").write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print json.loads(result)

Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

Hantera fel

I det här avsnittet beskrivs hur du hanterar fel i antecknings böcker för arbets böcker.

Python

# Errors in workflows thrown a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print "Retrying error", e
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Scala

// Errors in workflows thrown a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

Köra flera notebook-filer samtidigt

Du kan köra flera antecknings böcker samtidigt med hjälp av standard Scala och python-konstruktioner, till exempel trådar (Scala, python) och framtida (Scala, python). Arbets antecknings böckerna Advanced Notebook visar hur du använder dessa konstruktioner. Antecknings böckerna är i Scala, men du kan enkelt skriva motsvarande i python. Så här kör du exemplet:

  1. Hämta anteckningsbok-arkivet.
  2. Importera arkivet till en arbets yta.
  3. Kör antecknings boken för samtidiga Notebooks .