Uw eerste ETL-workload uitvoeren in Azure Databricks

Meer informatie over het gebruik van hulpprogramma's die gereed zijn voor productie van Azure Databricks voor het ontwikkelen en implementeren van uw eerste ETL-pijplijnen (extract, transform and load) voor gegevensindeling.

Aan het einde van dit artikel voelt u zich comfortabel:

  1. Een Databricks-rekencluster voor alle doeleinden starten.
  2. Een Databricks-notebook maken.
  3. Incrementele gegevensopname configureren voor Delta Lake met automatische laadprogramma's.
  4. Notebookcellen uitvoeren om gegevens te verwerken, op te vragen en te bekijken.
  5. Een notebook plannen als een Databricks-taak.

In deze zelfstudie worden interactieve notebooks gebruikt om algemene ETL-taken in Python of Scala te voltooien.

U kunt ook Delta Live Tables gebruiken om ETL-pijplijnen te bouwen. Databricks heeft Delta Live Tables gemaakt om de complexiteit van het bouwen, implementeren en onderhouden van ETL-pijplijnen voor productie te verminderen. Zie zelfstudie: Uw eerste Delta Live Tables-pijplijn uitvoeren.

U kunt ook de Databricks Terraform-provider gebruiken om de resources van dit artikel te maken. Zie Clusters, notebooks en taken maken met Terraform.

Vereisten

Notitie

Als u geen bevoegdheden voor clusterbeheer hebt, kunt u de meeste van de onderstaande stappen nog steeds voltooien zolang u toegang hebt tot een cluster.

Stap 1: Een cluster maken

Als u verkennende gegevensanalyse en data engineering wilt uitvoeren, maakt u een cluster om de rekenresources te bieden die nodig zijn om opdrachten uit te voeren.

  1. Klik op rekenpictogramCompute in de zijbalk.
  2. Klik op de pagina Compute op Cluster maken. Hiermee opent u de pagina Nieuw cluster.
  3. Geef een unieke naam op voor het cluster, laat de resterende waarden in de standaardstatus staan en klik op Cluster maken.

Zie Compute voor meer informatie over Databricks-clusters.

Stap 2: Een Databricks-notebook maken

Maak een notebook om aan de slag te gaan met het schrijven en uitvoeren van interactieve code in Azure Databricks.

  1. Klik op Nieuw pictogramNieuw in de zijbalk en klik vervolgens op Notitieblok.
  2. Op de pagina Notitieblok maken:
    • Geef een unieke naam op voor uw notitieblok.
    • Zorg ervoor dat de standaardtaal is ingesteld op Python of Scala.
    • Selecteer het cluster dat u in stap 1 hebt gemaakt in de vervolgkeuzelijst Cluster .
    • Klik op Create.

Er wordt een notitieblok geopend met een lege cel bovenaan.

Zie Notitieblokken beheren voor meer informatie over het maken en beheren van notitieblokken.

Stap 3: Automatische laadprogramma configureren voor het opnemen van gegevens naar Delta Lake

Databricks raadt het gebruik van automatische laadprogramma's aan voor incrementele gegevensopname. Automatisch laden detecteert en verwerkt automatisch nieuwe bestanden wanneer ze binnenkomen in de opslag van cloudobjecten.

Databricks raadt aan om gegevens op te slaan met Delta Lake. Delta Lake is een opensource-opslaglaag die ACID-transacties biedt en die data lakehouse mogelijk maakt. Delta Lake is de standaardindeling voor tabellen die zijn gemaakt in Databricks.

Als u automatisch laden wilt configureren voor het opnemen van gegevens naar een Delta Lake-tabel, kopieert en plakt u de volgende code in de lege cel in uw notebook:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Notitie

Met de variabelen die in deze code zijn gedefinieerd, kunt u deze veilig uitvoeren zonder dat er een conflict bestaat met bestaande werkruimteactiva of andere gebruikers. Beperkte netwerk- of opslagmachtigingen veroorzaken fouten bij het uitvoeren van deze code; neem contact op met uw werkruimtebeheerder om problemen met deze beperkingen op te lossen.

Zie Wat is Automatisch laadprogramma?voor meer informatie over Automatisch laden.

Stap 4: Gegevens verwerken en ermee werken

Notebooks voeren logische cellen per cel uit. De logica in de cel uitvoeren:

  1. Als u de cel wilt uitvoeren die u in de vorige stap hebt voltooid, selecteert u de cel en drukt u op Shift+Enter.

  2. Als u een query wilt uitvoeren op de tabel die u zojuist hebt gemaakt, kopieert en plakt u de volgende code in een lege cel en drukt u op Shift+Enter om de cel uit te voeren.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Als u een voorbeeld van de gegevens in uw DataFrame wilt bekijken, kopieert en plakt u de volgende code in een lege cel en drukt u op Shift+Enter om de cel uit te voeren.

    Python

    display(df)
    

    Scala

    display(df)
    

Zie Visualisaties in Databricks-notebooks voor meer informatie over interactieve opties voor het visualiseren van gegevens.

Stap 5: Een taak plannen

U kunt Databricks-notebooks uitvoeren als productiescripts door ze toe te voegen als een taak in een Databricks-taak. In deze stap maakt u een nieuwe taak die u handmatig kunt activeren.

Uw notitieblok als taak plannen:

  1. Klik aan de rechterkant van de koptekstbalk op Planning .
  2. Voer een unieke naam in voor de taaknaam.
  3. Klik op Handmatig.
  4. Selecteer in de vervolgkeuzelijst Cluster het cluster dat u in stap 1 hebt gemaakt.
  5. Klik op Create.
  6. Klik in het venster dat wordt weergegeven op Nu uitvoeren.
  7. Als u de resultaten van de taakuitvoering wilt zien, klikt u op het Externe koppeling pictogram naast de tijdstempel van de laatste uitvoering .

Zie Wat is Azure Databricks Jobs? voor meer informatie over taken.

Aanvullende integraties

Meer informatie over integraties en hulpprogramma's voor data engineering met Azure Databricks: