Végpontok közötti adatfolyam létrehozása a Databricksben

Ez a cikk bemutatja, hogyan hozhat létre és helyezhet üzembe egy végpontok közötti adatfeldolgozási folyamatot, beleértve a nyers adatok betöltését, az adatok átalakítását és a feldolgozott adatok elemzéseinek futtatását.

Feljegyzés

Bár ez a cikk bemutatja, hogyan hozhat létre teljes adatfolyamot a Databricks-jegyzetfüzetek és egy Azure Databricks-feladat használatával a munkafolyamatok vezényléséhez, a Databricks a Delta Live Tables deklaratív felületét javasolja megbízható, karbantartható és tesztelhető adatfeldolgozási folyamatok létrehozásához.

Mi az az adatfolyam?

Az adatfolyam végrehajtja az adatok forrásrendszerekből való áthelyezéséhez, az adatok követelmények alapján történő átalakításához és az adatok célrendszerben való tárolásához szükséges lépéseket. Az adatfolyam tartalmazza azokat a folyamatokat, amelyek ahhoz szükségesek, hogy a nyers adatokat előkészített adatokká alakítják, amelyeket a felhasználók felhasználhatnak. Egy adatfolyam például előkészítheti az adatokat, hogy az adatelemzők és adattudósok elemzéssel és jelentéskészítéssel nyerhessenek ki értéket az adatokból.

Az adatfolyamatok gyakori példája a kinyerési, átalakítási és betöltési (ETL-) munkafolyamat. Az ETL-feldolgozás során az adatok a forrásrendszerekből lesznek betöltve, és egy átmeneti területre kerülnek, a követelmények (az adatminőség biztosítása, a rekordok deduplikálása stb.) alapján alakulnak át, majd egy célrendszerbe, például adattárházba vagy data lake-be írnak.

Az adatfolyam lépései

Az Azure Databricksen futó adatfolyamok létrehozásának megkezdéséhez a cikkben szereplő példa egy adatfeldolgozási munkafolyamat létrehozását ismerteti:

  • Az Azure Databricks funkcióival felfedezhet egy nyers adathalmazt.
  • Hozzon létre egy Databricks-jegyzetfüzetet a nyers forrásadatok betöltéséhez és a nyers adatok céltáblába való írásához.
  • Hozzon létre egy Databricks-jegyzetfüzetet a nyers forrásadatok átalakításához és az átalakított adatok céltáblába írásához.
  • Hozzon létre egy Databricks-jegyzetfüzetet az átalakított adatok lekérdezéséhez.
  • Automatizálja az adatfolyamot egy Azure Databricks-feladattal.

Követelmények

  • Bejelentkezett az Azure Databricksbe és a Adattudomány > Mérnöki munkaterületre.
  • Jogosult fürt létrehozására vagy fürthöz való hozzáférésre.
  • (Nem kötelező) Ha táblákat szeretne közzétenni a Unity Catalogban, létre kell hoznia egy katalógust és sémát a Unity Catalogban.

Példa: Millió dal adatkészlet

Az ebben a példában használt adatkészlet a Million Song Dataset egy részhalmaza, amely a kortárs zeneszámok funkcióinak és metaadatainak gyűjteménye. Ez az adatkészlet az Azure Databricks-munkaterületen található mintaadatkészletekben érhető el.

1. lépés: Fürt létrehozása

A példában szereplő adatfeldolgozás és elemzés elvégzéséhez hozzon létre egy fürtöt, amely biztosítja a parancsok futtatásához szükséges számítási erőforrásokat.

Feljegyzés

Mivel ez a példa egy DBFS-ben tárolt mintaadatkészletet használ, és azt javasolja, hogy a táblák megmaradjanak a Unity Catalogban, létre kell hoznia egy fürtöt, amely egyfelhasználós hozzáférési móddal van konfigurálva. Az egyfelhasználós hozzáférési mód teljes hozzáférést biztosít a DBFS-hez, ugyanakkor lehetővé teszi a Unity Katalógushoz való hozzáférést is. Tekintse meg a DBFS és a Unity Katalógus ajánlott eljárásait.

  1. Kattintson a Számítás gombra az oldalsávon.
  2. A Számítás lapon kattintson a Fürt létrehozása elemre.
  3. Az Új fürt lapon adja meg a fürt egyedi nevét.
  4. Access módban válassza az Egy felhasználó lehetőséget.
  5. Az egyfelhasználós vagy szolgáltatásnév-hozzáférésben válassza ki a felhasználónevét.
  6. Hagyja meg a fennmaradó értékeket az alapértelmezett állapotban, és kattintson a Fürt létrehozása gombra.

A Databricks-fürtökkel kapcsolatos további információkért lásd: Compute.

2. lépés: A forrásadatok megismerése

Ha tudni szeretné, hogyan használhatja az Azure Databricks-felületet a nyers forrásadatok megismerésére, tekintse meg az adatfolyam forrásadatait. Ha közvetlenül az adatok betöltésére és előkészítésére szeretne lépni, folytassa a 3. lépésben: A nyers adatok betöltése.

3. lépés: A nyers adatok betöltése

Ebben a lépésben betölti a nyers adatokat egy táblába, hogy azokat további feldolgozás céljából elérhetővé tegye. A Databricks platform adategységeinek( például táblák) kezeléséhez a Databricks a Unity Catalog szolgáltatást javasolja. Ha azonban nincs engedélye arra, hogy létrehozza a szükséges katalógust és sémát a táblák Unity Catalogban való közzétételéhez, akkor is elvégezheti a következő lépéseket a táblák Hive-metaadattárban való közzétételével.

Az adatok betöltéséhez a Databricks az Automatikus betöltő használatát javasolja. Az automatikus betöltő automatikusan észleli és feldolgozza az új fájlokat, amikor megérkeztek a felhőobjektum-tárolóba.

Az Automatikus betöltőt úgy konfigurálhatja, hogy automatikusan észlelje a betöltött adatok sémáját, így anélkül inicializálhatja a táblákat, hogy explicit módon deklarálja az adatsémát, és új oszlopok bevezetésekor fejleszti a táblázatsémát. Ez szükségtelenné teszi a sémamódosítások manuális nyomon követését és alkalmazását az idő függvényében. A Databricks sémakövetkeztetést javasol az Automatikus betöltő használatakor. Az adatfeltárási lépésben látható módon azonban a dalok adatai nem tartalmaznak fejlécadatokat. Mivel a fejléc nem az adatokkal van tárolva, explicit módon kell definiálnia a sémát, ahogyan az a következő példában is látható.

  1. Az oldalsávon kattintson az Új gombraNew Icon, és válassza a Jegyzetfüzet lehetőséget a menüből. Megjelenik a Jegyzetfüzet létrehozása párbeszédpanel.

  2. Adja meg például a jegyzetfüzet Ingest songs datanevét. Alapértelmezés szerint:

    • A Python a kiválasztott nyelv.
    • A jegyzetfüzet az utolsó használt fürthöz van csatolva. Ebben az esetben az 1. lépésben létrehozott fürt: Fürt létrehozása.
  3. Írja be a következőt a jegyzetfüzet első cellájába:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    A Unity Catalog használata esetén cserélje le <table-name> a katalógust, a sémát és a táblanevet a betöltött rekordok (például data_pipelines.songs_data.raw_song_data) nevére. Ellenkező esetben cserélje le <table-name> egy tábla nevére, hogy az tartalmazza a betöltött rekordokat, például raw_song_data: .

    Cserélje le <checkpoint-path> egy könyvtár elérési útjára a DBFS-ben az ellenőrzőpontfájlok karbantartásához, például /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Kattintson Run Menua gombra, és válassza a Cella futtatása parancsot. Ez a példa az adatsémát a benne található file_pathösszes fájlból READMEbetölti, és a megadott table_nametáblába írja az adatokat.

4. lépés: A nyers adatok előkészítése

A nyers adatok elemzésre való előkészítéséhez az alábbi lépések átalakítják a nyers zeneszámok adatait a szükségtelen oszlopok szűrésével és egy új, időbélyeget tartalmazó mező hozzáadásával az új rekord létrehozásához.

  1. Az oldalsávon kattintson az Új gombraNew Icon, és válassza a Jegyzetfüzet lehetőséget a menüből. Megjelenik a Jegyzetfüzet létrehozása párbeszédpanel.

  2. Adja meg a jegyzetfüzet nevét. Például: Prepare songs data. Módosítsa az alapértelmezett nyelvet SQL-re.

  3. Írja be a következőt a jegyzetfüzet első cellájába:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    A Unity Catalog használata esetén cserélje le <table-name> a katalógus, a séma és a tábla nevét a szűrt és átalakított rekordok (például data_pipelines.songs_data.prepared_song_data) nevére. Ellenkező esetben cserélje le <table-name> egy tábla nevére a szűrt és átalakított rekordokat (például prepared_song_data).

    Cserélje le <raw-songs-table-name> az előző lépésben betöltött nyers zeneszámok rekordjait tartalmazó táblázat nevére.

  4. Kattintson Run Menua gombra, és válassza a Cella futtatása parancsot.

5. lépés: Az átalakított adatok lekérdezése

Ebben a lépésben kibővítheti a feldolgozási folyamatot lekérdezések hozzáadásával a dalok adatainak elemzéséhez. Ezek a lekérdezések az előző lépésben létrehozott előkészített rekordokat használják.

  1. Az oldalsávon kattintson az Új gombraNew Icon, és válassza a Jegyzetfüzet lehetőséget a menüből. Megjelenik a Jegyzetfüzet létrehozása párbeszédpanel.

  2. Adja meg a jegyzetfüzet nevét. Például: Analyze songs data. Módosítsa az alapértelmezett nyelvet SQL-re.

  3. Írja be a következőt a jegyzetfüzet első cellájába:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Cserélje le <prepared-songs-table-name> az előkészített adatokat tartalmazó tábla nevére. Például: data_pipelines.songs_data.prepared_song_data.

  4. Kattintson Down Caret a cellaműveletek menüre, válassza az Alábbi cella hozzáadása lehetőséget, és írja be a következőt az új cellába:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Cserélje le <prepared-songs-table-name> az előző lépésben létrehozott előkészített tábla nevére. Például: data_pipelines.songs_data.prepared_song_data.

  5. A lekérdezések futtatásához és a kimenet megtekintéséhez kattintson az Összes futtatása elemre.

6. lépés: Azure Databricks-feladat létrehozása a folyamat futtatásához

Létrehozhat egy munkafolyamatot, amely automatizálja az adatbetöltési, feldolgozási és elemzési lépések futtatását egy Azure Databricks-feladat használatával.

  1. A Adattudomány > Mérnöki munkaterületen tegye az alábbiak egyikét:
    • Kattintson Jobs Icona Munkafolyamatok elemre az oldalsávon, és kattintson a gombra Create Job Button.
    • Az oldalsávon kattintson az Új gombraNew Icon, és válassza a Feladat lehetőséget.
  2. A Feladatok lap Feladat párbeszédpanelén cserélje le a Feladat neve hozzáadása... elemet a feladat nevére. Például a "Songs munkafolyamat".
  3. A Tevékenység név mezőbe írja be az első tevékenység nevét, például Ingest_songs_data: .
  4. A Típus mezőben válassza ki a Jegyzetfüzet feladattípust.
  5. A Forrás területen válassza a Munkaterület lehetőséget.
  6. A fájlböngészővel keresse meg az adatbetöltési jegyzetfüzetet, kattintson a jegyzetfüzet nevére, és kattintson a Megerősítés gombra.
  7. A Fürt területen válassza ki a Shared_job_cluster vagy a lépésben létrehozott fürtöt Create a cluster .
  8. Kattintson a Létrehozás gombra.
  9. Kattintson Add Task Buttonaz imént létrehozott feladat alá, és válassza a Jegyzetfüzet lehetőséget.
  10. A Tevékenység név mezőbe írja be például Prepare_songs_dataa tevékenység nevét.
  11. A Típus mezőben válassza ki a Jegyzetfüzet feladattípust.
  12. A Forrás területen válassza a Munkaterület lehetőséget.
  13. A fájlböngészővel keresse meg az adatelőkészítési jegyzetfüzetet, kattintson a jegyzetfüzet nevére, és kattintson a Megerősítés gombra.
  14. A Fürt területen válassza ki a Shared_job_cluster vagy a lépésben létrehozott fürtöt Create a cluster .
  15. Kattintson a Létrehozás gombra.
  16. Kattintson Add Task Buttonaz imént létrehozott feladat alá, és válassza a Jegyzetfüzet lehetőséget.
  17. A Tevékenység név mezőbe írja be például Analyze_songs_dataa tevékenység nevét.
  18. A Típus mezőben válassza ki a Jegyzetfüzet feladattípust.
  19. A Forrás területen válassza a Munkaterület lehetőséget.
  20. A fájlböngészővel keresse meg az adatelemzési jegyzetfüzetet, kattintson a jegyzetfüzet nevére, majd a Megerősítés gombra.
  21. A Fürt területen válassza ki a Shared_job_cluster vagy a lépésben létrehozott fürtöt Create a cluster .
  22. Kattintson a Létrehozás gombra.
  23. A munkafolyamat futtatásához kattintson a gombra Run Now Button. A futtatás részleteinek megtekintéséhez kattintson a feladatfuttatási nézetben a futtatás Kezdési idő oszlopában található hivatkozásra. Kattintson az egyes tevékenységekre a feladatfuttatás részleteinek megtekintéséhez.
  24. Ha meg szeretné tekinteni az eredményeket a munkafolyamat befejezésekor, kattintson a végső adatelemzési feladatra. Megjelenik a Kimenet lap, és megjeleníti a lekérdezés eredményeit.

7. lépés: Az adatfolyam-feladat ütemezése

Feljegyzés

Az Azure Databricks-feladat ütemezett munkafolyamatok vezénylésére való használatának bemutatásához ez az első lépéseket bemutató példa külön jegyzetfüzetekre bontja a betöltési, előkészítési és elemzési lépéseket, majd minden egyes jegyzetfüzetet használ egy feladat létrehozására a feladatban. Ha az összes feldolgozás egyetlen jegyzetfüzetben található, egyszerűen ütemezheti a jegyzetfüzetet közvetlenül az Azure Databricks notebook felhasználói felületéről. Lásd: Ütemezett jegyzetfüzet-feladatok létrehozása és kezelése.

Gyakori követelmény egy adatfolyam ütemezett futtatása. A folyamatot futtató feladat ütemezésének meghatározása:

  1. Kattintson Jobs Icona Munkafolyamatok elemre az oldalsávon.
  2. A Név oszlopban kattintson a feladat nevére. Az oldalpanelen megjelennek a Feladat részletei.
  3. Kattintson az Eseményindító hozzáadása elemre a Feladat részletei panelen, és válassza az Ütemezett eseményindító típusa lehetőséget.
  4. Adja meg az időszakot, a kezdési időt és az időzónát. Ha szeretné, jelölje be a Cron szintaxis megjelenítése jelölőnégyzetet az ütemezés kvarc cron szintaxisban való megjelenítéséhez és szerkesztéséhez.
  5. Kattintson a Mentés gombra.

További információ