Strömbearbetning med Azure Stream Analytics

Cosmos DB
Event Hubs
Monitor
Stream Analytics

Den här referensarkitekturen visar en dataströmbearbetningspipeline från slutet till slut. Pipelinen matar in data från två källor, korrelerar poster i de två strömmarna och beräknar ett rullande medelvärde över ett tidsfönster. Resultaten lagras för ytterligare analys.

GitHub en referensimplementering för den här arkitekturen finns på GitHub.

Referensarkitektur för att skapa en dataströmbearbetningspipeline med Azure Stream Analytics

Scenario:Ett taxiföretag samlar in data om varje taxiresa. I det här scenariot antar vi att det finns två separata enheter som skickar data. Taxin har en mätare som skickar information om varje resa – längd, avstånd och upphämtning och avlämning. En separat enhet accepterar betalningar från kunder och skickar data om biljetten. Taxiföretaget vill beräkna det genomsnittliga tipset per mil som körs i realtid för att upptäcka trender.

Arkitektur

Arkitekturen består av följande komponenter.

Datakällor. I den här arkitekturen finns det två datakällor som genererar dataströmmar i realtid. Den första strömmen innehåller färdinformation och den andra innehåller prisinformation. Referensarkitekturen innehåller en simulerad datagenerator som läser från en uppsättning statiska filer och push-överföra data till Event Hubs. I ett riktigt program skulle datakällorna vara enheter som installerats i taxibilarna.

Azure Event Hubs. Event Hubs är en tjänst för händelseinmatning. Den här arkitekturen använder två händelsehubbinstanser, en för varje datakälla. Varje datakälla skickar en dataström till den associerade händelsehubben.

Azure Stream Analytics. Stream Analytics är en motor för händelsebearbetning. Ett Stream Analytics läser dataströmmarna från de två händelsehubben och utför bearbetning av dataströmmar.

Cosmos DB. Utdata från Stream Analytics är en serie poster som skrivs som JSON-dokument till en Cosmos DB dokumentdatabas.

Microsoft Power BI. Power BI är en uppsättning affärsanalysverktyg för att analysera data för affärsinsikter. I den här arkitekturen läser den in data från Cosmos DB. På så sätt kan användarna analysera hela uppsättningen historiska data som har samlats in. Du kan också strömma resultaten direkt från Stream Analytics till Power BI för en realtidsvy av data. Mer information finns i Realtidsströmning i Power BI.

Azure Monitor. Azure Monitor samlar in prestandamått om De Azure-tjänster som distribueras i lösningen. Genom att visualisera dessa på en instrumentpanel kan du få insikter om lösningens hälsotillstånd.

Datainhämtning

För att simulera en datakälla använder den här referensarkitekturen datauppsättningen New York City Taxi Data[1]. Den här datamängden innehåller data om taxiresor i New York City under en fyraårsperiod (2010–2013). Den innehåller två typer av poster: färddata och prisdata. Resedata omfattar resans varaktighet, resans avstånd och upphämtnings- och avlämningsplatsen. Prisdata omfattar pris-, skatte- och tipsbelopp. Vanliga fält i båda posttyperna är medaljnummer, hacklicens och leverantörs-ID. Tillsammans identifierar dessa tre fält en taxi plus en drivrutin. Data lagras i CSV-format.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois på Urbana-Championaign. https://doi.org/10.13012/J8PN93H8

Datageneratorn är ett .NET Core-program som läser posterna och skickar dem till Azure Event Hubs. Generatorn skickar färddata i JSON-format och prisdata i CSV-format.

Event Hubs använder partitioner för att segmentera data. Partitioner gör att en konsument kan läsa varje partition parallellt. När du skickar data till Event Hubs kan du uttryckligen ange partitionsnyckeln. Annars tilldelas poster till partitioner med resursallokering.

I det här scenariot bör färddata och prisdata få samma partitions-ID för en viss taxibil. Detta gör Stream Analytics kan tillämpa en grad av parallellitet när den korrelerar de två strömmarna. En post i partition n av färddata kommer att matcha en post i partition n i prisdata.

Diagram över dataströmbearbetning med Azure Stream Analytics och Event Hubs

I datageneratorn har den gemensamma datamodellen för båda posttyperna en egenskap som är PartitionKey sammanfogningen av Medallion , HackLicense och VendorId .

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Den här egenskapen används för att tillhandahålla en explicit partitionsnyckel när den skickas till Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Strömbearbetning

Dataströmbearbetningsjobbet definieras med en SQL fråga med flera olika steg. I de två första stegen väljer du helt enkelt poster från de två indataströmmarna.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

Nästa steg sammanfogar de två indataströmmarna för att välja matchande poster från varje dataström.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Den här frågan sammanfogar poster i en uppsättning fält som unikt identifierar matchande poster ( PartitionId och PickupTime ).

Anteckning

Vi vill att TaxiRideTaxiFare strömmarna och ska sammanfogas med den unika kombinationen Medallion av , och HackLicenseVendorIdPickupTime . I det här PartitionId fallet omfattar fälten , och , men detta bör inte ses som MedallionHackLicenseVendorId normalt.

I Stream Analytics är kopplingar temporala,vilket innebär att poster ansluts inom en viss tidsperiod. Annars kan jobbet behöva vänta på en matchning på obestämd tid. Funktionen DATEDIFF anger hur långt två matchande poster kan separeras i tid för en matchning.

Det sista steget i jobbet beräknar det genomsnittliga tipset per mil, grupperat efter ett hoppande fönster på 5 minuter.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics innehåller flera fönsterfunktioner. Ett hoppande fönster flyttas framåt i tiden med en fast period, i det här fallet 1 minut per hopp. Resultatet är att beräkna ett glidande medelvärde under de senaste 5 minuterna.

I arkitekturen som visas här sparas endast resultatet av Stream Analytics jobb till Cosmos DB. I ett scenario med stordata bör du överväga att Event Hubs Capture för att spara rådata för händelser i Azure Blob Storage. Genom att behålla rådata kan du köra batchfrågor över dina historiska data vid ett senare tillfälle för att härleda nya insikter från data.

Skalbarhetsöverväganden

Event Hubs

Dataflödeskapaciteten för Event Hubs mäts i genomflödesenheter. Du kan autoskala en händelsehubb genom att aktivera automatisk blåsning ,som automatiskt skalar dataflödesenheterna baserat på trafik, upp till ett konfigurerat maxbelopp.

Stream Analytics

Till Stream Analytics databehandlingsresurser som allokerats till ett jobb mäts i strömningsenheter. Stream Analytics skalas bäst om jobbet kan parallelliseras. På så sätt Stream Analytics kan distribuera jobbet över flera beräkningsnoder.

Använd Event Hubs för att PARTITION BY partitionera Stream Analytics jobb. Data delas upp i delmängder baserat på de Event Hubs partitionerna.

Fönsterfunktioner och temporala kopplingar kräver ytterligare SU. Använd när det är PARTITION BY möjligt så att varje partition bearbetas separat. Mer information finns i Förstå och justera strömningsenheter.

Om det inte går att parallellisera hela Stream Analytics kan du försöka dela upp jobbet i flera steg och börja med ett eller flera parallella steg. På så sätt kan de första stegen köras parallellt. I den här referensarkitekturen kan du till exempel:

  • Steg 1 och 2 är enkla SELECT instruktioner som väljer poster inom en enda partition.
  • Steg 3 utför en partitionerad koppling över två indataströmmar. Det här steget drar nytta av det faktum att matchande poster delar samma partitionsnyckel och därför garanterat har samma partitions-ID i varje indataström.
  • Steg 4 aggregerar över alla partitioner. Det här steget kan inte parallelliseras.

Använd Stream Analytics för att se hur många partitioner som har tilldelats varje steg i jobbet. Följande diagram visar jobbdiagrammet för den här referensarkitekturen:

Jobbdiagram

Cosmos DB

Dataflödeskapaciteten för Cosmos DB mäts i enheter för förfrågningsbegäran (RU). För att kunna skala en Cosmos DB container efter 10 000 RU måste du ange en partitionsnyckel när du skapar containern och inkludera partitionsnyckeln i varje dokument.

I den här referensarkitekturen skapas nya dokument bara en gång per minut (hoppande fönsterintervall), så dataflödeskraven är ganska låga. Därför behöver du inte tilldela en partitionsnyckel i det här scenariot.

Övervakningsöverväganden

Med alla dataströmbearbetningslösningar är det viktigt att övervaka systemets prestanda och hälsa. Azure Monitor mått och diagnostikloggar för de Azure-tjänster som används i arkitekturen. Azure Monitor är inbyggt i Azure-plattformen och kräver ingen ytterligare kod i ditt program.

Någon av följande varningssignaler visar att du bör skala ut den relevanta Azure-resursen:

  • Event Hubs begränsar begäranden eller ligger nära den dagliga meddelandekvoten.
  • Det Stream Analytics jobbet använder mer än 80 % av de allokerade direktuppspelningsenheterna (SU).
  • Cosmos DB börjar begränsa begäranden.

Referensarkitekturen innehåller en anpassad instrumentpanel som distribueras till Azure Portal. När du har distribuerat arkitekturen kan du visa instrumentpanelen genom att öppna Azure Portal och välja från listan över instrumentpaneler. Mer information om hur du skapar och distribuerar anpassade instrumentpaneler i Azure Portal finns i Skapa Azure-instrumentpaneler programmässigt.

Följande bild visar instrumentpanelen efter att Stream Analytics körts i ungefär en timme.

Skärmbild av instrumentpanelen taxibilar

Panelen till vänster visar att SU-förbrukningen för Stream Analytics ökar under de första 15 minuterna och sedan planar ut. Det här är ett typiskt mönster när jobbet når ett stabilt tillstånd.

Observera att Event Hubs är begränsningsbegäranden som visas i den övre högra panelen. En tillfällig begränsad begäran är inte ett problem, eftersom Event Hubs klient-SDK automatiskt försöker igen när den får ett begränsningsfel. Men om du ser konsekventa begränsningsfel innebär det att händelsehubben behöver fler genomflödesenheter. I följande diagram visas en testkörning med funktionen Event Hubs automatisk blåsning, som automatiskt skalar ut genomflödesenheterna efter behov.

Skärmbild av Event Hubs automatisk skalning

Automatisk blåsning aktiverades vid cirka 06:35-markeringen. Du kan se en p-minskning i begränsade begäranden, Event Hubs skalas upp till 3 genomflödesenheter automatiskt.

Intressant nog hade detta bieffekten av att öka SU-användningen i Stream Analytics jobbet. Genom begränsning minskade Event Hubs artificiellt inmatningshastigheten för Stream Analytics jobb. Det är faktiskt vanligt att en lösning av en prestandaflaskhals avslöjar en annan. I det här fallet löstes problemet genom att Stream Analytics ytterligare SU för Stream Analytics-jobbet.

Kostnadsöverväganden

Normalt beräknar du kostnader med hjälp av priskalkylatorn för Azure. Här är några saker att tänka på när det gäller tjänster som används i den här referensarkitekturen.

Azure Stream Analytics

Azure Stream Analytics priset per antal strömningsenheter (0,11 USD/timme) som krävs för att bearbeta data till tjänsten.

Stream Analytics kan vara dyrt om du inte bearbetar data i realtid eller små mängder data. För dessa användningsfall bör du överväga Azure Functions eller Logic Apps att flytta data från Azure Event Hubs till ett datalager.

Azure Event Hubs och Azure Cosmos DB

Kostnadsöverväganden för Azure Event Hubs och Cosmos DB finns i Kostnadsöverväganden i Referensarkitektur för dataströmbearbetning Azure Databricks dataströmmar.

Distribuera lösningen

För att distribuera och köra referensimplementering följer du stegen i GitHub readme.

DevOps-överväganden

  • Skapa separata resursgrupper för produktions-, utvecklings- och testmiljöer. Med separata resursgrupper blir det enklare att hantera distributioner, ta bort testdistributioner och tilldela åtkomsträttigheter.

  • Använd Azure Resource Manager för att distribuera Azure-resurser efter IaC-processen (infrastruktur som kod). Med mallar är det enklare att automatisera distributioner med Hjälp av Azure DevOps Serviceseller andra CI/CD-lösningar.

  • Placera varje arbetsbelastning i en separat distributionsmall och lagra resurserna i källkontrollsystem. Du kan distribuera mallarna tillsammans eller individuellt som en del av en CI/CD-process, vilket gör automatiseringsprocessen enklare.

    I den här Azure Event Hubs identifieras Azure Event Hubs, Log Analytics och Cosmos DB som en enskild arbetsbelastning. Dessa resurser ingår i en enda ARM-mall.

  • Överväg att mellanlagra dina arbetsbelastningar. Distribuera till olika faser och kör valideringskontroller i varje fas innan du går vidare till nästa steg. På så sätt kan du skicka uppdateringar till produktionsmiljöerna på ett mycket kontrollerat sätt och minimera oväntade distributionsproblem.

  • Överväg att Azure Monitor för att analysera prestandan för din dataströmbearbetningspipeline. Mer information finns i Monitoring Azure Databricks.

Mer information finns i DevOps-avsnittet i Microsoft Azure Well-Architected Framework.

Du kanske vill granska följande Azure-exempelscenarier som demonstrerar specifika lösningar med hjälp av några av samma tekniker: