Övning – Integrera en notebook-fil i Azure Synapse-pipelines
I den här lektionen skapar du en Azure Synapse Spark-notebook-fil för att analysera och transformera data som läses in av ett mappningsdataflöde och lagra data i en datasjö. Du skapar en parametercell som accepterar en strängparameter som definierar mappnamnet för de data som notebook-filen skriver till datasjön.
Sedan lägger du till den här notebook-filen i en Synapse-pipeline och skickar det unika pipelinekörnings-ID:t till notebook-parametern så att du senare kan korrelera pipelinekörningen med data som sparats av notebook-aktiviteten.
Slutligen använder du monitorhubben i Synapse Studio för att övervaka pipelinekörningen, hämta körnings-ID:t och leta sedan upp motsvarande filer som lagras i datasjön.
Om Apache Spark och notebook-filer
Apache Spark är ett ramverk för parallellbearbetning som stöder minnesintern bearbetning för att öka prestanda i program för stordataanalys. Apache Spark i Azure Synapse Analytics är en av Microsofts implementeringar av Apache Spark i molnet.
En Apache Spark-notebook-fil i Synapse Studio är ett webbgränssnitt där du kan skapa filer som innehåller livekod, visualiseringar och narrativ text. Notebook-filer är ett bra ställe att validera idéer och använda snabba experiment för att få insikter från dina data. Notebook-filer används också ofta i dataförberedelser, datavisualisering, maskininlärning och andra stordatascenarier.
Skapa en Synapse Spark-anteckningsbok
Anta att du har skapat ett mappningsdataflöde i Synapse Analytics för att bearbeta, ansluta och importera användarprofildata. Nu vill du hitta de fem främsta produkterna för varje användare, baserat på vilka som är både föredragna och bästa val, och har flest inköp under de senaste 12 månaderna. Sedan vill du beräkna de fem främsta produkterna totalt.
I den här övningen skapar du en Synapse Spark-notebook-fil för att göra dessa beräkningar.
Öppna Synapse Analytics Studio (https://web.azuresynapse.net/) och gå till datahubben .
Välj fliken Länkad (1) och expandera det primära datasjölagringskontot (2) under Azure Data Lake Storage Gen2. Välj wwi-02-containern (3) och öppna mappen top-products (4). Högerklicka på valfri Parquet-fil (5), välj menyalternativet Ny anteckningsbok (6) och välj sedan Läs in till DataFrame (7). Om du inte ser mappen väljer du
Refresh
.Kontrollera att notebook-filen är ansluten till Spark-poolen.
Ersätt Parquet-filnamnet med
*.parquet
(1) för att markera alla Parquet-filer itop-products
mappen. Sökvägen bör till exempel likna:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Välj Kör alla i notebook-verktygsfältet för att köra anteckningsboken.
Kommentar
Första gången du kör en notebook-fil i en Spark-pool skapar Synapse en ny session. Det kan ta cirka 3–5 minuter.
Kommentar
Om du bara vill köra cellen hovra över cellen och markera ikonen Kör cell till vänster om cellen eller markera cellen och ange sedan Ctrl+Retur.
Skapa en ny cell under genom att + välja knappen och välja cellobjektet Kod. Knappen + finns under notebook-cellen till vänster. Du kan också expandera menyn + Cell i verktygsfältet Notebook och välja cellobjektet Kod.
Kör följande kommando i den nya cellen för att fylla i en ny dataram med namnet
topPurchases
, skapa en ny temporär vy med namnettop_purchases
och visa de första 100 raderna:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
Utdata bör se ut ungefär så här:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Kör följande kommando i en ny cell för att skapa en ny tillfällig vy med hjälp av SQL:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Kommentar
Det finns inga utdata för den här frågan.
Frågan använder den
top_purchases
tillfälliga vyn som källa och använder enrow_number() over
metod för att tillämpa ett radnummer för posterna för varje användare därItemsPurchasedLast12Months
är störst.where
Satsen filtrerar resultatet så att vi bara hämtar upp till fem produkter där bådaIsTopProduct
ochIsPreferredProduct
är inställda på true. Detta ger oss de fem mest köpta produkterna för varje användare där dessa produkter också identifieras som deras favoritprodukter, enligt deras användarprofil som lagras i Azure Cosmos DB.Kör följande kommando i en ny cell för att skapa och visa en ny DataFrame som lagrar resultatet av den
top_5_products
tillfälliga vy som du skapade i föregående cell:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Du bör se utdata som liknar följande, som visar de fem bästa produkterna per användare:
Beräkna de fem främsta produkterna totalt sett, baserat på de produkter som både kunderna föredrar och som köpt mest. Det gör du genom att köra följande kommando i en ny cell:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
I den här cellen grupperade vi de fem bästa produkterna efter produkt-ID, summerade det totala antalet artiklar som köpts under de senaste 12 månaderna, sorterade det värdet i fallande ordning och returnerade de fem främsta resultaten. Ditt resultat bör likna följande:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Skapa en parametercell
Azure Synapse-pipelines letar efter parametercellen och behandlar den här cellen som standardvärden för de parametrar som skickades in vid körningen. Körningsmotorn lägger till en ny cell under parametercellen med indataparametrar för att skriva över standardvärdena. När en parametercell inte har angetts infogas den inmatade cellen överst i anteckningsboken.
Vi ska köra den här notebook-filen från en pipeline. Vi vill skicka in en parameter som anger ett
runId
variabelvärde som ska användas för att namnge Parquet-filen. Kör följande kommando i en ny cell:import uuid # Generate random GUID runId = uuid.uuid4()
Vi använder biblioteket
uuid
som medföljer Spark för att generera ett slumpmässigt GUID. Vi vill åsidosätta variabelnrunId
med en parameter som skickas i pipelinen. För att göra detta måste vi växla detta som en parametercell.Välj åtgärdernas ellipser (...) i cellens övre högra hörn (1) och välj sedan Växla parametercell (2)..
När du har växlat det här alternativet visas taggen Parametrar i cellen.
Klistra in följande kod i en ny cell för att använda variabeln
runId
som Parquet-filnamn i/top5-products/
sökvägen i det primära datasjökontot. ErsättYOUR_DATALAKE_NAME
i sökvägen med namnet på ditt primära data lake-konto. För att hitta detta bläddrar du upp till Cell 1 överst på sidan (1). Kopiera datasjölagringskontot från sökvägen (2). Klistra in det här värdet som ersättning förYOUR_DATALAKE_NAME
i sökvägen (3) i den nya cellen och kör sedan kommandot i cellen.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Kontrollera att filen har skrivits till datasjön. Gå till datahubben och välj fliken Länkad (1). Expandera det primära datasjölagringskontot och välj sedan wwi-02-containern(2). Gå till mappen top5-products (3). Du bör se en mapp för Parquet-filen i katalogen med ett GUID som filnamn (4)..
Parquet-skrivmetoden på dataramen i notebook-cellen skapade den här katalogen eftersom den inte fanns tidigare.
Lägga till notebook-filen i en Synapse-pipeline
När du refererar tillbaka till Dataflöde som vi beskrev i början av övningen antar vi att du vill köra den här notebook-filen efter att Dataflöde körs som en del av orkestreringsprocessen. Det gör du genom att lägga till den här notebook-filen i en pipeline som en ny notebook-aktivitet.
Gå tillbaka till notebook-filen. Välj Egenskaper(1) i det övre högra hörnet av anteckningsboken och ange
Calculate Top 5 Products
sedan som Namn (2).Välj Lägg till i pipeline(1) längst upp till höger i anteckningsboken och välj sedan Befintlig pipeline (2)..
Välj pipelinen Skriv användarprofildata till ASA (1) och välj sedan Lägg till *(2).
Synapse Studio lägger till notebook-aktiviteten i pipelinen. Ordna om notebook-aktiviteten så att den finns till höger om dataflödesaktiviteten. Välj dataflödesaktiviteten och dra en grön ruta för pipelineanslutning för lyckad aktivitet till notebook-aktiviteten.
Pilen Lyckad aktivitet instruerar pipelinen att köra notebook-aktiviteten när dataflödesaktiviteten har körts.
Välj aktiviteten Notebook (1) och välj sedan fliken Inställningar (2), expandera Basparametrar (3) och välj sedan + Ny (4). Ange
runId
i fältet Namn (5). Välj Sträng som typ (6). För Värdet väljer du Lägg till dynamiskt innehåll (7).Välj Pipeline-körnings-ID under Systemvariabler (1). Detta lägger
@pipeline().RunId
till i rutan för dynamiskt innehåll (2). Välj Slutför (3) för att stänga dialogrutan.Värdet för pipelinekörnings-ID är ett unikt GUID som tilldelats varje pipelinekörning. Vi använder det här värdet för namnet på Parquet-filen genom att skicka in det här värdet som
runId
notebook-parameter. Vi kan sedan titta igenom pipelinekörningshistoriken och hitta den specifika Parquet-fil som skapats för varje pipelinekörning.Välj Publicera alla och sedan Publicera för att spara ändringarna.
När publiceringen är klar väljer du Lägg till utlösare (1) och sedan Utlösare nu (2) för att köra den uppdaterade pipelinen.
Välj OK för att köra utlösaren.
Övervaka pipelinekörningen
Med monitorhubben kan du övervaka aktuella och historiska aktiviteter för SQL, Apache Spark och Pipelines.
Gå till monitorhubben.
Välj Pipelinekörningar (1) och vänta tills pipelinekörningen har slutförts (2).. Du kan behöva uppdatera (3) vyn.
Välj namnet på pipelinen för att visa pipelinens aktivitetskörningar.
Observera både dataflödesaktiviteten och den nya notebook-aktiviteten (1). Anteckna pipelinekörnings-ID-värdet (2). Vi jämför detta med Parquet-filnamnet som genereras av notebook-filen. Välj namnet på Calculate Top 5 Products Notebook (Beräkna de 5 främsta produkternas anteckningsbok) för att visa dess information (3)..
Här ser vi information om notebook-körning. Du kan välja Uppspelning(1) för att se en uppspelning av förloppet genom jobben (2). Längst ned kan du visa diagnostik och loggar med olika filteralternativ (3).. Till höger kan vi visa körningsinformationen, till exempel varaktighet, Livy-ID, Spark-poolinformation och så vidare. Välj länken Visa information för ett jobb för att visa dess information (5).
Användargränssnittet för Spark-programmet öppnas på en ny flik där vi kan se steginformationen. Expandera DAG Visualisering för att visa sceninformationen.
Gå tillbaka till datahubben .
Välj fliken Länkad (1) och välj sedan wwi-02-containern(2) på lagringskontot för den primära datasjön, gå till mappen top5-products (3) och kontrollera att det finns en mapp för Parquet-filen vars namn matchar pipelinekörnings-ID:t.
Som du ser har vi en fil vars namn matchar pipelinekörnings-ID :t som vi tidigare noterade:
Dessa värden matchar eftersom vi skickade i pipelinekörnings-ID:t till parametern
runId
för notebook-aktiviteten.