Skapa en datapipeline från slutpunkt till slutpunkt i Databricks

Den här artikeln visar hur du skapar och distribuerar en pipeline för databearbetning från slutpunkt till slutpunkt, inklusive hur du matar in rådata, transformerar data och kör analyser på bearbetade data.

Kommentar

Även om den här artikeln visar hur du skapar en fullständig datapipeline med databricks notebook-filer och ett Azure Databricks-jobbför att orkestrera ett arbetsflöde rekommenderar Databricks att du använder Delta Live Tables, ett deklarativt gränssnitt för att skapa tillförlitliga, underhållsbara och testbara databearbetningspipelines.

Vad är en datapipeline?

En datapipeline implementerar de steg som krävs för att flytta data från källsystem, transformera dessa data baserat på krav och lagra data i ett målsystem. En datapipeline innehåller alla processer som krävs för att omvandla rådata till förberedda data som användarna kan använda. En datapipeline kan till exempel förbereda data så att dataanalytiker och dataforskare kan extrahera värde från data via analys och rapportering.

Ett arbetsflöde för extrahering, transformering och inläsning (ETL) är ett vanligt exempel på en datapipeline. Vid ETL-bearbetning matas data in från källsystem och skrivs till ett mellanlagringsområde, transformeras baserat på krav (säkerställa datakvalitet, deduplicera poster och så vidare) och skrivs sedan till ett målsystem, till exempel ett informationslager eller en datasjö.

Steg för datapipeline

För att hjälpa dig att komma igång med att skapa datapipelines i Azure Databricks går exemplet i den här artikeln igenom hur du skapar ett arbetsflöde för databearbetning:

  • Använd Azure Databricks-funktioner för att utforska en rådatauppsättning.
  • Skapa en Databricks-notebook-fil för att mata in rådata och skriva rådata till en måltabell.
  • Skapa en Databricks-notebook-fil för att transformera rådata och skriva transformerade data till en måltabell.
  • Skapa en Databricks-notebook-fil för att köra frågor mot transformerade data.
  • Automatisera datapipelinen med ett Azure Databricks-jobb.

Behov

Exempel: Million Song-datauppsättning

Datauppsättningen som används i det här exemplet är en delmängd av Datauppsättningen Million Song, en samling funktioner och metadata för samtida musikspår. Den här datamängden är tillgänglig i exempeldatauppsättningarna som ingår i din Azure Databricks-arbetsyta.

Steg 1: Skapa ett kluster

Om du vill utföra databearbetningen och analysen i det här exemplet skapar du ett kluster för att tillhandahålla de beräkningsresurser som behövs för att köra kommandon.

Kommentar

Eftersom det här exemplet använder en exempeldatauppsättning som lagras i DBFS och rekommenderar att du bevarar tabeller till Unity Catalog skapar du ett kluster som konfigurerats med åtkomstläge för en användare. Åtkomstläge för en användare ger fullständig åtkomst till DBFS samtidigt som åtkomst till Unity Catalog aktiveras. Se Metodtips för DBFS och Unity Catalog.

  1. Klicka på Beräkna i sidofältet.
  2. På sidan Beräkning klickar du på Skapa kluster.
  3. På sidan Nytt kluster anger du ett unikt namn för klustret.
  4. I Åtkomstläge väljer du Enskild användare.
  5. I Åtkomst till en användare eller tjänstens huvudnamn väljer du ditt användarnamn.
  6. Lämna de återstående värdena i standardtillståndet och klicka på Skapa kluster.

Mer information om Databricks-kluster finns i Beräkning.

Steg 2: Utforska källdata

Information om hur du använder Azure Databricks-gränssnittet för att utforska rådata finns i Utforska källdata för en datapipeline. Om du vill gå direkt till att mata in och förbereda data fortsätter du till Steg 3: Mata in rådata.

Steg 3: Mata in rådata

I det här steget läser du in rådata i en tabell för att göra dem tillgängliga för vidare bearbetning. För att hantera datatillgångar på Databricks-plattformen, till exempel tabeller, rekommenderar Databricks Unity Catalog. Men om du inte har behörighet att skapa den katalog och schema som krävs för att publicera tabeller till Unity Catalog kan du fortfarande utföra följande steg genom att publicera tabeller till Hive-metaarkivet.

Databricks rekommenderar att du använder automatisk inläsning för att mata in data. Automatisk inläsning identifierar och bearbetar automatiskt nya filer när de tas emot i molnobjektlagring.

Du kan konfigurera automatisk inläsning för att automatiskt identifiera schemat för inlästa data, så att du kan initiera tabeller utan att uttryckligen deklarera dataschemat och utveckla tabellschemat när nya kolumner introduceras. Detta eliminerar behovet av att manuellt spåra och tillämpa schemaändringar över tid. Databricks rekommenderar schemainferens när du använder Auto Loader. Men som du ser i datautforskningssteget innehåller låtdata inte rubrikinformation. Eftersom rubriken inte lagras med data måste du uttryckligen definiera schemat, som du ser i nästa exempel.

  1. I sidofältet klickar du på New IconNy och väljer Anteckningsbok på menyn. Dialogrutan Skapa anteckningsbok visas.

  2. Ange ett namn på anteckningsboken, till exempel Ingest songs data. Som standard:

    • Python är det valda språket.
    • Anteckningsboken är kopplad till det senaste klustret som du använde. I det här fallet klustret som du skapade i steg 1: Skapa ett kluster.
  3. Ange följande i den första cellen i anteckningsboken:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Om du använder Unity Catalog ersätter <table-name> du med en katalog, ett schema och ett tabellnamn för att innehålla de inmatade posterna (till exempel data_pipelines.songs_data.raw_song_data). Annars ersätter du <table-name> med namnet på en tabell som ska innehålla de inmatade posterna, raw_song_datatill exempel .

    Ersätt <checkpoint-path> med en sökväg till en katalog i DBFS för att underhålla kontrollpunktsfiler, /tmp/pipeline_get_started/_checkpoint/song_datatill exempel .

  4. Klicka på Run Menuoch välj Kör cell. Det här exemplet definierar dataschemat med hjälp av informationen från README, matar in låtdata från alla filer som finns i file_pathoch skriver data till tabellen som anges av table_name.

Steg 4: Förbereda rådata

För att förbereda rådata för analys transformerar följande steg rådata genom att filtrera bort onödiga kolumner och lägga till ett nytt fält som innehåller en tidsstämpel för att skapa den nya posten.

  1. I sidofältet klickar du på New IconNy och väljer Anteckningsbok på menyn. Dialogrutan Skapa anteckningsbok visas.

  2. Ange ett namn på anteckningsboken. Exempel: Prepare songs data Ändra standardspråket till SQL.

  3. Ange följande i den första cellen i anteckningsboken:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Om du använder Unity Catalog ersätter <table-name> du med en katalog, ett schema och ett tabellnamn för att innehålla de filtrerade och transformerade posterna (till exempel data_pipelines.songs_data.prepared_song_data). Annars ersätter du <table-name> med namnet på en tabell som ska innehålla de filtrerade och transformerade posterna (till exempel prepared_song_data).

    Ersätt <raw-songs-table-name> med namnet på tabellen som innehåller de råa låtarna som matades in i föregående steg.

  4. Klicka på Run Menuoch välj Kör cell.

Steg 5: Fråga transformerade data

I det här steget utökar du bearbetningspipelinen genom att lägga till frågor för att analysera låtdata. Dessa frågor använder de förberedda poster som skapades i föregående steg.

  1. I sidofältet klickar du på New IconNy och väljer Anteckningsbok på menyn. Dialogrutan Skapa anteckningsbok visas.

  2. Ange ett namn på anteckningsboken. Exempel: Analyze songs data Ändra standardspråket till SQL.

  3. Ange följande i den första cellen i anteckningsboken:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Ersätt <prepared-songs-table-name> med namnet på tabellen som innehåller förberedda data. Exempel: data_pipelines.songs_data.prepared_song_data

  4. Klicka på Down Caret menyn cellåtgärder, välj Lägg till cell nedan och ange följande i den nya cellen:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Ersätt <prepared-songs-table-name> med namnet på den förberedda tabellen som skapades i föregående steg. Exempel: data_pipelines.songs_data.prepared_song_data

  5. Om du vill köra frågorna och visa utdata klickar du på Kör alla.

Steg 6: Skapa ett Azure Databricks-jobb för att köra pipelinen

Du kan skapa ett arbetsflöde för att automatisera körningen av datainmatnings-, bearbetnings- och analysstegen med hjälp av ett Azure Databricks-jobb.

  1. Gör något av följande i arbetsytan Datavetenskap & Engineering:
    • Klicka på Jobs IconArbetsflöden i sidofältet och klicka på .Create Job Button
    • I sidofältet klickar du på New IconNytt och väljer Jobb.
  2. I dialogrutan Aktivitet på fliken Uppgifter ersätter du Lägg till ett namn för jobbet... med jobbets namn. Till exempel "Songs workflow".
  3. I Uppgiftsnamn anger du ett namn för den första aktiviteten, till exempel Ingest_songs_data.
  4. I Typ väljer du aktivitetstypen Notebook .
  5. I Källa väljer du Arbetsyta.
  6. Använd filläsaren för att hitta anteckningsboken för datainmatning, klicka på anteckningsbokens namn och klicka på Bekräfta.
  7. I Kluster väljer du Shared_job_cluster eller klustret som du skapade i Create a cluster steget.
  8. Klicka på Skapa.
  9. Klicka Add Task Button under den uppgift som du nyss skapade och välj Anteckningsbok.
  10. I Uppgiftsnamn anger du ett namn för aktiviteten, till exempel Prepare_songs_data.
  11. I Typ väljer du aktivitetstypen Notebook .
  12. I Källa väljer du Arbetsyta.
  13. Använd filläsaren för att hitta anteckningsboken för förberedelse av data, klicka på anteckningsbokens namn och klicka på Bekräfta.
  14. I Kluster väljer du Shared_job_cluster eller klustret som du skapade i Create a cluster steget.
  15. Klicka på Skapa.
  16. Klicka Add Task Button under den uppgift som du nyss skapade och välj Anteckningsbok.
  17. I Uppgiftsnamn anger du ett namn för aktiviteten, till exempel Analyze_songs_data.
  18. I Typ väljer du aktivitetstypen Notebook .
  19. I Källa väljer du Arbetsyta.
  20. Använd filläsaren för att hitta anteckningsboken för dataanalys, klicka på anteckningsbokens namn och klicka på Bekräfta.
  21. I Kluster väljer du Shared_job_cluster eller klustret som du skapade i Create a cluster steget.
  22. Klicka på Skapa.
  23. Om du vill köra arbetsflödet klickar du på Run Now Button. Om du vill visa information om körningen klickar du på länken i kolumnen Starttid för körningen i jobbkörningsvyn. Klicka på varje aktivitet om du vill visa information om aktivitetskörningen.
  24. Om du vill visa resultatet när arbetsflödet är klart klickar du på den slutliga dataanalysuppgiften. Sidan Utdata visas och visar frågeresultatet.

Steg 7: Schemalägg datapipelinejobbet

Kommentar

Om du vill demonstrera hur du använder ett Azure Databricks-jobb för att orkestrera ett schemalagt arbetsflöde separerar det här komma igång-exemplet stegen för inmatning, förberedelse och analys i separata notebook-filer, och varje notebook-fil används sedan för att skapa en uppgift i jobbet. Om all bearbetning finns i en enda notebook-fil kan du enkelt schemalägga notebook-filen direkt från användargränssnittet för Azure Databricks Notebook. Se Skapa och hantera schemalagda notebook-jobb.

Ett vanligt krav är att köra en datapipeline enligt ett schema. Så här definierar du ett schema för jobbet som kör pipelinen:

  1. Klicka på Jobs IconArbetsflöden i sidofältet.
  2. Klicka på jobbnamnet i kolumnen Namn. Sidopanelen visar jobbinformationen.
  3. Klicka på Lägg till utlösare i panelen Jobbinformation och välj Schemalagd i Utlösartyp.
  4. Ange period, starttid och tidszon. Du kan också markera kryssrutan Visa Cron-syntax för att visa och redigera schemat i Quartz Cron-syntax.
  5. Klicka på Spara.

Läs mer