Övning – Integrera en notebook-fil i Azure Synapse-pipelines

Slutförd

I den här lektionen skapar du en Azure Synapse Spark-notebook-fil för att analysera och transformera data som läses in av ett mappningsdataflöde och lagra data i en datasjö. Du skapar en parametercell som accepterar en strängparameter som definierar mappnamnet för de data som notebook-filen skriver till datasjön.

Sedan lägger du till den här notebook-filen i en Synapse-pipeline och skickar det unika pipelinekörnings-ID:t till notebook-parametern så att du senare kan korrelera pipelinekörningen med data som sparats av notebook-aktiviteten.

Slutligen använder du monitorhubben i Synapse Studio för att övervaka pipelinekörningen, hämta körnings-ID:t och leta sedan upp motsvarande filer som lagras i datasjön.

Om Apache Spark och notebook-filer

Apache Spark är ett ramverk för parallellbearbetning som stöder minnesintern bearbetning för att öka prestanda i program för stordataanalys. Apache Spark i Azure Synapse Analytics är en av Microsofts implementeringar av Apache Spark i molnet.

En Apache Spark-notebook-fil i Synapse Studio är ett webbgränssnitt där du kan skapa filer som innehåller livekod, visualiseringar och narrativ text. Notebook-filer är ett bra ställe att validera idéer och använda snabba experiment för att få insikter från dina data. Notebook-filer används också ofta i dataförberedelser, datavisualisering, maskininlärning och andra stordatascenarier.

Skapa en Synapse Spark-anteckningsbok

Anta att du har skapat ett mappningsdataflöde i Synapse Analytics för att bearbeta, ansluta och importera användarprofildata. Nu vill du hitta de fem främsta produkterna för varje användare, baserat på vilka som är både föredragna och bästa val, och har flest inköp under de senaste 12 månaderna. Sedan vill du beräkna de fem främsta produkterna totalt.

I den här övningen skapar du en Synapse Spark-notebook-fil för att göra dessa beräkningar.

  1. Öppna Synapse Analytics Studio (https://web.azuresynapse.net/) och gå till datahubben .

    The Data menu item is highlighted.

  2. Välj fliken Länkad (1) och expandera det primära datasjölagringskontot (2) under Azure Data Lake Storage Gen2. Välj wwi-02-containern (3) och öppna mappen top-products (4). Högerklicka på valfri Parquet-fil (5), välj menyalternativet Ny anteckningsbok (6) och välj sedan Läs in till DataFrame (7). Om du inte ser mappen väljer du Refresh.

    The Parquet file and new notebook option are highlighted.

  3. Kontrollera att notebook-filen är ansluten till Spark-poolen.

    The attach to Spark pool menu item is highlighted.

  4. Ersätt Parquet-filnamnet med *.parquet(1) för att markera alla Parquet-filer i top-products mappen. Sökvägen bör till exempel likna: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    The filename is highlighted.

  5. Välj Kör alla i notebook-verktygsfältet för att köra anteckningsboken.

    The cell results are displayed.

    Kommentar

    Första gången du kör en notebook-fil i en Spark-pool skapar Synapse en ny session. Det kan ta cirka 3–5 minuter.

    Kommentar

    Om du bara vill köra cellen hovra över cellen och markera ikonen Kör cell till vänster om cellen eller markera cellen och ange sedan Ctrl+Retur.

  6. Skapa en ny cell under genom att + välja knappen och välja cellobjektet Kod. Knappen + finns under notebook-cellen till vänster. Du kan också expandera menyn + Cell i verktygsfältet Notebook och välja cellobjektet Kod.

    The Add Code menu option is highlighted.

  7. Kör följande kommando i den nya cellen för att fylla i en ny dataram med namnet topPurchases, skapa en ny temporär vy med namnet top_purchasesoch visa de första 100 raderna:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    Utdata bör se ut ungefär så här:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Kör följande kommando i en ny cell för att skapa en ny tillfällig vy med hjälp av SQL:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Kommentar

    Det finns inga utdata för den här frågan.

    Frågan använder den top_purchases tillfälliga vyn som källa och använder en row_number() over metod för att tillämpa ett radnummer för posterna för varje användare där ItemsPurchasedLast12Months är störst. where Satsen filtrerar resultatet så att vi bara hämtar upp till fem produkter där båda IsTopProduct och IsPreferredProduct är inställda på true. Detta ger oss de fem mest köpta produkterna för varje användare där dessa produkter också identifieras som deras favoritprodukter, enligt deras användarprofil som lagras i Azure Cosmos DB.

  9. Kör följande kommando i en ny cell för att skapa och visa en ny DataFrame som lagrar resultatet av den top_5_products tillfälliga vy som du skapade i föregående cell:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Du bör se utdata som liknar följande, som visar de fem bästa produkterna per användare:

    The top five preferred products are displayed per user.

  10. Beräkna de fem främsta produkterna totalt sett, baserat på de produkter som både kunderna föredrar och som köpt mest. Det gör du genom att köra följande kommando i en ny cell:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    I den här cellen grupperade vi de fem bästa produkterna efter produkt-ID, summerade det totala antalet artiklar som köpts under de senaste 12 månaderna, sorterade det värdet i fallande ordning och returnerade de fem främsta resultaten. Ditt resultat bör likna följande:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Skapa en parametercell

Azure Synapse-pipelines letar efter parametercellen och behandlar den här cellen som standardvärden för de parametrar som skickades in vid körningen. Körningsmotorn lägger till en ny cell under parametercellen med indataparametrar för att skriva över standardvärdena. När en parametercell inte har angetts infogas den inmatade cellen överst i anteckningsboken.

  1. Vi ska köra den här notebook-filen från en pipeline. Vi vill skicka in en parameter som anger ett runId variabelvärde som ska användas för att namnge Parquet-filen. Kör följande kommando i en ny cell:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Vi använder biblioteket uuid som medföljer Spark för att generera ett slumpmässigt GUID. Vi vill åsidosätta variabeln runId med en parameter som skickas i pipelinen. För att göra detta måste vi växla detta som en parametercell.

  2. Välj åtgärdernas ellipser (...) i cellens övre högra hörn (1) och välj sedan Växla parametercell (2)..

    The menu item is highlighted.

    När du har växlat det här alternativet visas taggen Parametrar i cellen.

    The cell is configured to accept parameters.

  3. Klistra in följande kod i en ny cell för att använda variabeln runId som Parquet-filnamn i /top5-products/ sökvägen i det primära datasjökontot. Ersätt YOUR_DATALAKE_NAME i sökvägen med namnet på ditt primära data lake-konto. För att hitta detta bläddrar du upp till Cell 1 överst på sidan (1). Kopiera datasjölagringskontot från sökvägen (2). Klistra in det här värdet som ersättning för YOUR_DATALAKE_NAME i sökvägen (3) i den nya cellen och kör sedan kommandot i cellen.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    The path is updated with the name of the primary data lake account.

  4. Kontrollera att filen har skrivits till datasjön. Gå till datahubben och välj fliken Länkad (1). Expandera det primära datasjölagringskontot och välj sedan wwi-02-containern(2). Gå till mappen top5-products (3). Du bör se en mapp för Parquet-filen i katalogen med ett GUID som filnamn (4)..

    The parquet file is highlighted.

    Parquet-skrivmetoden på dataramen i notebook-cellen skapade den här katalogen eftersom den inte fanns tidigare.

Lägga till notebook-filen i en Synapse-pipeline

När du refererar tillbaka till Dataflöde som vi beskrev i början av övningen antar vi att du vill köra den här notebook-filen efter att Dataflöde körs som en del av orkestreringsprocessen. Det gör du genom att lägga till den här notebook-filen i en pipeline som en ny notebook-aktivitet.

  1. Gå tillbaka till notebook-filen. Välj Egenskaper(1) i det övre högra hörnet av anteckningsboken och ange Calculate Top 5 Products sedan som Namn (2).

    The properties blade is displayed.

  2. Välj Lägg till i pipeline(1) längst upp till höger i anteckningsboken och välj sedan Befintlig pipeline (2)..

    The add to pipeline button is highlighted.

  3. Välj pipelinen Skriv användarprofildata till ASA (1) och välj sedan Lägg till *(2).

    The pipeline is selected.

  4. Synapse Studio lägger till notebook-aktiviteten i pipelinen. Ordna om notebook-aktiviteten så att den finns till höger om dataflödesaktiviteten. Välj dataflödesaktiviteten och dra en grön ruta för pipelineanslutning för lyckad aktivitet till notebook-aktiviteten.

    The green arrow is highlighted.

    Pilen Lyckad aktivitet instruerar pipelinen att köra notebook-aktiviteten när dataflödesaktiviteten har körts.

  5. Välj aktiviteten Notebook (1) och välj sedan fliken Inställningar (2), expandera Basparametrar (3) och välj sedan + Ny (4). Ange runId i fältet Namn (5). Välj Sträng som typ (6). För Värdet väljer du Lägg till dynamiskt innehåll (7).

    The settings are displayed.

  6. Välj Pipeline-körnings-ID under Systemvariabler (1). Detta lägger @pipeline().RunId till i rutan för dynamiskt innehåll (2). Välj Slutför (3) för att stänga dialogrutan.

    The dynamic content form is displayed.

    Värdet för pipelinekörnings-ID är ett unikt GUID som tilldelats varje pipelinekörning. Vi använder det här värdet för namnet på Parquet-filen genom att skicka in det här värdet som runId notebook-parameter. Vi kan sedan titta igenom pipelinekörningshistoriken och hitta den specifika Parquet-fil som skapats för varje pipelinekörning.

  7. Välj Publicera alla och sedan Publicera för att spara ändringarna.

    Publish all is highlighted.

  8. När publiceringen är klar väljer du Lägg till utlösare (1) och sedan Utlösare nu (2) för att köra den uppdaterade pipelinen.

    The trigger menu item is highlighted.

  9. Välj OK för att köra utlösaren.

    The OK button is highlighted.

Övervaka pipelinekörningen

Med monitorhubben kan du övervaka aktuella och historiska aktiviteter för SQL, Apache Spark och Pipelines.

  1. Gå till monitorhubben.

    The Monitor hub menu item is selected.

  2. Välj Pipelinekörningar (1) och vänta tills pipelinekörningen har slutförts (2).. Du kan behöva uppdatera (3) vyn.

    The pipeline run succeeded.

  3. Välj namnet på pipelinen för att visa pipelinens aktivitetskörningar.

    The pipeline name is selected.

  4. Observera både dataflödesaktiviteten och den nya notebook-aktiviteten (1). Anteckna pipelinekörnings-ID-värdet (2). Vi jämför detta med Parquet-filnamnet som genereras av notebook-filen. Välj namnet på Calculate Top 5 Products Notebook (Beräkna de 5 främsta produkternas anteckningsbok) för att visa dess information (3)..

    The pipeline run details are displayed.

  5. Här ser vi information om notebook-körning. Du kan välja Uppspelning(1) för att se en uppspelning av förloppet genom jobben (2). Längst ned kan du visa diagnostik och loggar med olika filteralternativ (3).. Till höger kan vi visa körningsinformationen, till exempel varaktighet, Livy-ID, Spark-poolinformation och så vidare. Välj länken Visa information för ett jobb för att visa dess information (5).

    The run details are displayed.

  6. Användargränssnittet för Spark-programmet öppnas på en ny flik där vi kan se steginformationen. Expandera DAG Visualisering för att visa sceninformationen.

    The Spark stage details are displayed.

  7. Gå tillbaka till datahubben .

    Data hub.

  8. Välj fliken Länkad (1) och välj sedan wwi-02-containern(2) på lagringskontot för den primära datasjön, gå till mappen top5-products (3) och kontrollera att det finns en mapp för Parquet-filen vars namn matchar pipelinekörnings-ID:t.

    The file is highlighted.

    Som du ser har vi en fil vars namn matchar pipelinekörnings-ID :t som vi tidigare noterade:

    The Pipeline run ID is highlighted.

    Dessa värden matchar eftersom vi skickade i pipelinekörnings-ID:t till parametern runId för notebook-aktiviteten.