Erstellen einer End-to-End-Datenpipeline in Databricks

In diesem Artikel erfahren Sie, wie Sie eine durchgängige Datenverarbeitungspipeline erstellen und einsetzen, einschließlich der Aufnahme von Rohdaten, der Umwandlung der Daten und der Durchführung von Analysen mit den verarbeiteten Daten.

Hinweis

Obwohl dieser Artikel zeigt, wie Sie eine komplette Datenpipeline mit Databricks Notebooks und einem Azure Databricks Auftrag zur Orchestrierung eines Workflows erstellen können, empfiehlt Databricks die Verwendung von Delta Live Tables, einer deklarativen Schnittstelle zur Erstellung zuverlässiger, verwaltbarer und testbarer Datenverarbeitungspipelines.

Was ist eine Datenpipeline?

Eine Datenpipeline implementiert die erforderlichen Schritte, um Daten aus Quellsystemen zu verschieben, diese Daten basierend auf gewissen Anforderungen zu transformieren und die Daten in einem Zielsystem zu speichern. Eine Datenpipeline umfasst alle Prozesse, die zum Umwandeln von Rohdaten in aufbereitete Daten erforderlich sind, die Benutzer nutzen können. Eine Datenpipeline kann beispielsweise Daten so aufbereiten, dass Datenanalysten und wissenschaftliche Fachkräfte für Daten durch Analysen und Berichte einen Mehrwert aus den Daten ziehen können.

Ein ETL-Workflow (Extrahieren, Transformieren und Laden) ist ein gängiges Beispiel für eine Datenpipeline. Bei der ETL-Verarbeitung werden Daten aus Quellsystemen erfasst und in einen Stagingbereich geschrieben, basierend auf Anforderungen (Sicherstellung der Datenqualität, Deduplizieren von Datensätzen usw.) transformiert und dann in ein Zielsystem wie ein Data Warehouse oder einen Data Lake geschrieben.

Schritte der Datenpipeline

Das Beispiel in diesem Artikel führt Sie durch die Erstellung eines Datenverarbeitungsworkflows, um Ihnen den Einstieg in das Erstellen von Datenpipelines in Azure Databricks zu erleichtern:

  • Untersuchen eines nicht formatierten Datasets mithilfe von Azure Databricks-Features
  • Erstellen eines Databricks-Notebooks, um nicht formatierte Quelldaten zu erfassen und die Rohdaten in eine Zieltabelle zu schreiben
  • Erstellen eines Databricks-Notebooks, um die nicht formatierten Quelldaten zu transformieren und die transformierten Daten in eine Zieltabelle zu schreiben
  • Erstellen eines Databricks-Notebooks, um die transformierten Daten abzufragen
  • Automatisieren der Datenpipeline mit einem Azure Databricks-Auftrag

Anforderungen

Beispiel: Million Song Dataset

Das in diesem Beispiel verwendete Dataset ist eine Teilmenge von Million Song Dataset, eine Sammlung von Features und Metadaten für zeitgenössische Musiktitel. Dieses Dataset ist in den Beispieldatasets verfügbar, die in Ihrem Azure Databricks-Arbeitsbereich enthalten sind.

Schritt 1: Erstellen eines Clusters

Um die Datenverarbeitung und -analyse in diesem Beispiel durchzuführen, erstellen Sie einen Cluster, um die zum Ausführen von Befehlen erforderlichen Computeressourcen bereitzustellen.

Hinweis

Da in diesem Beispiel ein in DBFS gespeichertes Beispieldataset verwendet und empfohlen wird, Tabellen in Unity Catalog beizubehalten, erstellen Sie einen Cluster, der mit dem Modus für Einzelbenutzerzugriff konfiguriert ist. Der Modus für Einzelbenutzerzugriff bietet Vollzugriff auf DBFS und ermöglicht gleichzeitig den Zugriff auf Unity Catalog. Weitere Informationen finden Sie unter Bewährte Methoden für DBFS und Unity Catalog.

  1. Klicken Sie auf der Randleiste auf Compute.
  2. Klicken Sie auf der Seite „Compute“ auf Cluster erstellen.
  3. Geben Sie auf der Seite „Neuer Cluster“ einen eindeutigen Namen für den Cluster ein.
  4. Wählen Sie für den ZugriffsmodusEinzelbenutzer aus.
  5. Wählen Sie unter Einzelbenutzer- oder Dienstprinzipalzugriff Ihren Benutzernamen aus.
  6. Lassen Sie die verbleibenden Werte im Standardzustand, und wählen Sie Cluster erstellen aus.

Weitere Informationen zu Databricks-Clustern finden Sie unter Compute.

Schritt 2: Untersuchen der Quelldaten

Informationen zur Verwendung der Azure Databricks-Schnittstelle zum Untersuchen der Rohdaten finden Sie unter Erkunden der Quelldaten für eine Datenpipeline. Wenn Sie direkt mit dem Erfassen und Aufbereiten der Daten beginnen möchten, fahren Sie mit Schritt 3: Erfassen der Rohdaten fort.

Schritt 3: Erfassen der Rohdaten

In diesem Schritt laden Sie die Rohdaten in eine Tabelle, um sie für die weitere Verarbeitung verfügbar zu machen. Für die Verwaltung von Datenressourcen z. B. Tabellen, auf der Databricks-Plattform empfiehlt Databricks Unity Catalog. Sollten Sie nicht über die Berechtigungen zum Erstellen des erforderlichen Katalogs und Schemas zum Veröffentlichen von Tabellen in Unity Catalog verfügen, können Sie dennoch die folgenden Schritte ausführen, indem Sie Tabellen im Hive-Metastore veröffentlichen.

Zum Erfassen von Daten empfiehlt Databricks die Verwendung von Auto Loader. Der Autoloader erkennt und verarbeitet automatisch neue Dateien, sobald sie im Cloudobjektspeicher empfangen werden.

Sie können den Autoloader so konfigurieren, dass das Schema der geladenen Daten automatisch erkannt wird, was es Ihnen erlaubt, Tabellen zu initialisieren, ohne das Datenschema explizit zu deklarieren und das Tabellenschema zu entwickeln, wenn neue Spalten eingeführt werden. Dadurch wird die Notwendigkeit beseitigt, Schemaänderungen im Laufe der Zeit manuell nachverfolgen und anwenden zu müssen. Databricks empfiehlt bei Verwendung des Autoloaders einen Schemarückschluss. Wie jedoch im Schritt zur Datenuntersuchung zu sehen ist, enthalten die Songdaten keine Headerinformationen. Da der Header nicht mit den Daten gespeichert wird, müssen Sie das Schema explizit definieren, wie im nächsten Beispiel gezeigt.

  1. Klicken Sie auf der Seitenleiste auf New IconNeu, und wählen Sie im Menü die Option Notebook aus. Das Dialogfeld Notebook erstellen wird angezeigt.

  2. Geben Sie einen Namen für das Notebook ein, etwa Ingest songs data. Standardmäßig gilt:

    • Python ist die ausgewählte Sprache.
    • Das Notebook wird an den zuletzt verwendeten Cluster angefügt. In diesem Fall ist das der Cluster, den Sie in Schritt 1: Erstellen eines Clusters erstellt haben.
  3. Geben Sie in der ersten Zelle des Notebooks Folgendes ein:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Wenn Sie Unity Catalog verwenden, ersetzen Sie <table-name> durch einen Katalog-, Schema- und Tabellennamen, der die eingelesenen Datensätze enthalten soll (z. B. data_pipelines.songs_data.raw_song_data). Ersetzen Sie andernfalls <table-name> durch den Namen der Tabelle, die die erfassten Datensätze enthalten soll, z. B. raw_song_data.

    Ersetzen Sie <checkpoint-path> durch den Pfad zu einem Verzeichnis im DBFS zum Verwalten von Prüfpunktdateien, z. B. /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Klicken Sie auf Run Menu, und wählen Sie Zelle ausführen aus. In diesem Beispiel wird das Datenschema mithilfe der Informationen aus README definiert, die Songdaten werden aus allen in file_path enthaltenen Dateien erfasst, und die Daten werden in die von table_name angegebene Tabelle geschrieben.

Schritt 4: Vorbereiten der Rohdaten

Um die Rohdaten für die Analyse vorzubereiten, werden in den folgenden Schritten die Rohdaten der Songs umgewandelt, indem nicht benötigte Spalten herausgefiltert werden und ein neues Feld mit einem Zeitstempel für die Erstellung des neuen Datensatzes hinzugefügt wird.

  1. Klicken Sie auf der Seitenleiste auf New IconNeu, und wählen Sie im Menü die Option Notebook aus. Das Dialogfeld Notebook erstellen wird angezeigt.

  2. Geben Sie einen Namen für das Notebook an. Beispiel: Prepare songs data. Ändern Sie die Standardsprache in SQL.

  3. Geben Sie in der ersten Zelle des Notebooks Folgendes ein:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Wenn Sie Unity Catalog verwenden, ersetzen Sie <table-name> durch einen Katalog, ein Schema und einen Tabellennamen, die die gefilterten und transformierten Datensätze enthalten sollen (z. B. data_pipelines.songs_data.prepared_song_data). Ersetzen Sie andernfalls <table-name> durch den Namen einer Tabelle, die die gefilterten und transformierten Datensätze enthalten soll (z. B. prepared_song_data).

    Ersetzen Sie <raw-songs-table-name> durch den Namen der Tabelle, die die im vorherigen Schritt erfassten unformatierten Songdatensätze enthält.

  4. Klicken Sie auf Run Menu, und wählen Sie Zelle ausführen aus.

Schritt 5: Abfragen der transformierten Daten

In diesem Schritt erweitern Sie die Verarbeitungspipeline, indem Sie Abfragen hinzufügen, um die Songdaten zu analysieren. Diese Abfragen verwenden die aufbereiteten Datensätze, die im vorherigen Schritt erstellt wurden.

  1. Klicken Sie auf der Seitenleiste auf New IconNeu, und wählen Sie im Menü die Option Notebook aus. Das Dialogfeld Notebook erstellen wird angezeigt.

  2. Geben Sie einen Namen für das Notebook an. Beispiel: Analyze songs data. Ändern Sie die Standardsprache in SQL.

  3. Geben Sie in der ersten Zelle des Notebooks Folgendes ein:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Ersetzen Sie <prepared-songs-table-name> durch den Namen der Tabelle, die die vorbereiteten Daten enthält. Beispiel: data_pipelines.songs_data.prepared_song_data.

  4. Klicken Sie im Menü „Zellenaktionen“ auf Down Caret, wählen Sie Zelle unterhalb einfügen aus, und geben Sie Folgendes in die neue Zelle ein:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Ersetzen Sie <prepared-songs-table-name> durch den Namen der vorbereiteten Tabelle, die im vorherigen Schritt erstellt wurde. Beispiel: data_pipelines.songs_data.prepared_song_data.

  5. Klicken Sie auf Alle ausführen, um die Abfragen auszuführen und die Ausgabe anzuzeigen.

Schritt 6: Erstellen eines Azure Databricks-Auftrags zum Ausführen der Pipeline

Sie können einen Workflow erstellen, um die Ausführung der Datenerfassungs-, Verarbeitungs- und Analyseschritte mithilfe eines Azure Databricks-Auftrags zu automatisieren.

  1. Führen Sie in Ihrem Data Science und Entwicklungs-Arbeitsbereich eine der folgenden Aktionen aus:
    • Klicken Sie in der Seitenleiste auf Jobs IconWorkflows und dann auf die Schaltfläche Create Job Button.
    • Klicken Sie auf der Seitenleiste auf New IconNeu, und wählen Sie Auftrag aus.
  2. Ersetzen Sie im Aufgabendialogfeld auf der Registerkarte AufgabenNamen für Ihren Auftrag hinzufügen… durch den Namen Ihres Auftrags. Beispiel: „Songs-Workflow“.
  3. Geben Sie unter Aufgabenname einen Namen für die erste Aufgabe ein, z. B. Ingest_songs_data.
  4. Wählen Sie unter Typ den Aufgabentyp Notebook aus.
  5. Wählen Sie unter Quelle die Option Arbeitsbereich aus.
  6. Verwenden Sie den Dateibrowser, um das Notebook für die Datenerfassung zu suchen, und klicken Sie auf den Namen des Notebooks und dann auf Bestätigen.
  7. Wählen Sie unter Cluster die Option Shared_job_cluster oder den Cluster aus, den Sie im Schritt Create a cluster erstellt haben.
  8. Klicken Sie auf Erstellen.
  9. Wählen Sie unter der Aufgabe, die Sie gerade erstellt haben, die Add Task Button und dann Notebook aus.
  10. Geben Sie unter Aufgabenname einen Namen für die Aufgabe ein, z. B. Prepare_songs_data.
  11. Wählen Sie unter Typ den Aufgabentyp Notebook aus.
  12. Wählen Sie unter Quelle die Option Arbeitsbereich aus.
  13. Verwenden Sie den Dateibrowser, um das Notebook für die Datenaufbereitung zu suchen, und klicken Sie auf den Namen des Notebooks und dann auf Bestätigen.
  14. Wählen Sie unter Cluster die Option Shared_job_cluster oder den Cluster aus, den Sie im Schritt Create a cluster erstellt haben.
  15. Klicken Sie auf Erstellen.
  16. Wählen Sie unter der Aufgabe, die Sie gerade erstellt haben, die Add Task Button und dann Notebook aus.
  17. Geben Sie unter Aufgabenname einen Namen für die Aufgabe ein, z. B. Analyze_songs_data.
  18. Wählen Sie unter Typ den Aufgabentyp Notebook aus.
  19. Wählen Sie unter Quelle die Option Arbeitsbereich aus.
  20. Verwenden Sie den Dateibrowser, um das Notebook für die Datenanalyse zu suchen, und klicken Sie auf den Namen des Notebooks und dann auf Bestätigen.
  21. Wählen Sie unter Cluster die Option Shared_job_cluster oder den Cluster aus, den Sie im Schritt Create a cluster erstellt haben.
  22. Klicken Sie auf Erstellen.
  23. Klicken Sie auf Run Now Button, um den Workflow auszuführen. Klicken Sie zum Anzeigen von Details für die Ausführung in der Ansicht Auftragsausführungen in der Spalte Startzeit für die Ausführung auf den Link. Klicken Sie auf die einzelnen Aufgaben, um Details für die Taskausführung anzuzeigen.
  24. Um die Ergebnisse nach Abschluss des Workflows anzuzeigen, klicken Sie auf die abschließende Datenanalyseaufgabe. Die Seite Ausgabe wird angezeigt und zeigt die Abfrageergebnisse an.

Schritt 7: Planen des Datenpipelineauftrags

Hinweis

Um die Verwendung eines Azure Databricks-Auftrags zur Orchestrierung eines geplanten Workflows zu demonstrieren, werden in diesem Einstiegsbeispiel die Schritte Erfassen, Aufbereiten und Analysieren in separate Notebooks aufgeteilt. Jedes Notebook wird dann zur Erstellung einer Aufgabe im Auftrag verwendet. Wenn die gesamte Verarbeitung in einem einzigen Notebook enthalten ist, können Sie das Notebook ganz einfach direkt über die Azure Databricks-Notebookbenutzeroberfläche planen. Weitere Informationen finden Sie unter Erstellen und Verwalten geplanter Notebookaufträge.

Eine häufige Anforderung ist die geplante Ausführung einer Datenpipeline. So definieren Sie einen Zeitplan für den Auftrag, der die Pipeline ausführt

  1. Klicken Sie auf der Randleiste auf Jobs IconWorkflows.
  2. Klicken Sie in der Spalte Name auf den Auftragsnamen. Im Seitenbereich werden die Auftragsdetails angezeigt.
  3. Klicken Sie im Bereich Auftragsdetails auf Trigger hinzufügen, und wählen Sie unter Triggertyp die Option Geplant aus.
  4. Geben Sie den Zeitraum, die Startzeit und die Zeitzone an. Aktivieren Sie optional das Kontrollkästchen Cron-Syntax anzeigen, um den Zeitplan unter Quartz Cron-Syntax anzuzeigen und zu bearbeiten.
  5. Klicken Sie auf Speichern.

Erfahren Sie mehr