Strömbearbetning med Azure Stream Analytics

Azure Cosmos DB
Azure Event Hubs
Azure Monitor
Azure Stream Analytics

Den här referensarkitekturen visar en dataströmbearbetningspipeline från slutpunkt till slutpunkt. Pipelinen matar in data från två källor, korrelerar poster i de två strömmarna och beräknar ett rullande medelvärde över ett tidsfönster. Resultaten lagras för ytterligare analys.

GitHub-logotyp En referensimplementering för den här arkitekturen finns på GitHub.

Arkitektur

Diagram som visar referensarkitektur för att skapa en pipeline för dataströmbearbetning med Azure Stream Analytics.

Ladda ned en Visio-fil med den här arkitekturen.

Arbetsflöde

Arkitekturen består av följande komponenter:

Datakällor. I den här arkitekturen finns det två datakällor som genererar dataströmmar i realtid. Den första strömmen innehåller färdinformation och den andra innehåller information om priser. Referensarkitekturen innehåller en simulerad datagenerator som läser från en uppsättning statiska filer och skickar data till Event Hubs. I ett verkligt program skulle datakällorna vara enheter installerade i taxibilarna.

Azure Event Hubs. Event Hubs är en händelseinmatningstjänst . Den här arkitekturen använder två händelsehubbinstanser, en för varje datakälla. Varje datakälla skickar en dataström till den associerade händelsehubben.

Azure Stream Analytics. Stream Analytics är en motor för händelsebearbetning. Ett Stream Analytics-jobb läser dataströmmarna från de två händelsehubbarna och utför dataströmbearbetning.

Azure Cosmos DB. Utdata från Stream Analytics-jobbet är en serie poster som skrivs som JSON-dokument till en Azure Cosmos DB-dokumentdatabas.

Microsoft Power BI. Power BI är en uppsättning affärsanalysverktyg för att analysera data för affärsinsikter. I den här arkitekturen läser den in data från Azure Cosmos DB. På så sätt kan användarna analysera den fullständiga uppsättningen historiska data som har samlats in. Du kan också strömma resultatet direkt från Stream Analytics till Power BI för en realtidsvy av data. Mer information finns i Realtidsströmning i Power BI.

Azure Monitor. Azure Monitor samlar in prestandamått om de Azure-tjänster som distribueras i lösningen. Genom att visualisera dessa på en instrumentpanel kan du få insikter om lösningens hälsotillstånd.

Information om scenario

Scenario: Ett taxiföretag samlar in data om varje taxiresa. I det här scenariot antar vi att det finns två separata enheter som skickar data. Taxin har en mätare som skickar information om varje resa – varaktighet, avstånd och upphämtnings- och avlämningsplatser. En separat enhet accepterar betalningar från kunder och skickar data om priser. Taxibolaget vill beräkna den genomsnittliga dricksen per körd mil i realtid för att upptäcka trender.

Potentiella användningsfall

Den här lösningen är optimerad för detaljhandelsscenariot.

Datainsamling

För att simulera en datakälla använder den här referensarkitekturen datauppsättningen New York City Taxi Data[1]. Den här datamängden innehåller data om taxiresor i New York City under en fyraårsperiod (2010–2013). Den innehåller två typer av poster: kördata och prisdata. Ride-data inkluderar resans varaktighet, reseavstånd och upphämtnings- och avlämningsplats. Prisdata inkluderar belopp för biljettpriser, skatter och tips. Vanliga fält i båda posttyperna är medaljongnummer, hacklicens och leverantörs-ID. Tillsammans identifierar dessa tre fält unikt en taxi plus en förare. Data lagras i CSV-format.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois vid Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Datageneratorn är ett .NET Core-program som läser posterna och skickar dem till Azure Event Hubs. Generatorn skickar kördata i JSON-format och prisdata i CSV-format.

Event Hubs använder partitioner för att segmentera data. Med partitioner kan en konsument läsa varje partition parallellt. När du skickar data till Event Hubs kan du uttryckligen ange partitionsnyckeln. I annat fall tilldelas poster till partitioner i resursallokering.

I det här scenariot bör kördata och prisdata få samma partitions-ID för en viss taxi. Detta gör det möjligt för Stream Analytics att tillämpa en grad av parallellitet när det korrelerar de två strömmarna. En post i partition n av kördata matchar en post i partition n av prisdata.

Diagram över dataströmbearbetning med Azure Stream Analytics och Event Hubs

I datageneratorn har den gemensamma datamodellen för båda posttyperna en PartitionKey egenskap som är sammanlänkningen av Medallion, HackLicenseoch VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Den här egenskapen används för att tillhandahålla en explicit partitionsnyckel när du skickar till Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Dataströmbearbetning

Dataströmbearbetningsjobbet definieras med hjälp av en SQL-fråga med flera olika steg. De första två stegen väljer helt enkelt poster från de två indataströmmarna.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

I nästa steg kopplas de två indataströmmarna till att välja matchande poster från varje ström.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Den här frågan kopplar poster till en uppsättning fält som unikt identifierar matchande poster (PartitionId och PickupTime).

Kommentar

Vi vill att strömmarna TaxiRide och TaxiFare ska kopplas av den unika kombinationen av Medallion, HackLicenseVendorId och PickupTime. I det här fallet PartitionId omfattar fälten Medallion, HackLicense och VendorId , men detta bör inte tas som vanligt.

I Stream Analytics är kopplingarna tidsmässiga, vilket innebär att poster kopplas inom en viss tidsperiod. Annars kan jobbet behöva vänta på obestämd tid för en matchning. Funktionen DATEDIFF anger hur långt två matchande poster kan avgränsas i tid för en matchning.

Det sista steget i jobbet beräknar det genomsnittliga tipset per mil, grupperat efter ett hoppfönster på 5 minuter.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics innehåller flera fönsterfunktioner. Ett hoppfönster flyttas framåt i tiden med en fast period, i det här fallet 1 minut per hopp. Resultatet är att beräkna ett glidande medelvärde under de senaste 5 minuterna.

I arkitekturen som visas här sparas bara resultatet av Stream Analytics-jobbet i Azure Cosmos DB. För ett stordatascenario bör du även överväga att använda Event Hubs Capture för att spara rådata i Azure Blob Storage. Om du behåller rådata kan du köra batchfrågor över dina historiska data vid senare tillfälle för att härleda nya insikter från data.

Att tänka på

Dessa överväganden implementerar grundpelarna i Azure Well-Architected Framework, som är en uppsättning vägledande grundsatser som kan användas för att förbättra kvaliteten på en arbetsbelastning. Mer information finns i Microsoft Azure Well-Architected Framework.

Skalbarhet

Event Hubs

Dataflödeskapaciteten för Event Hubs mäts i dataflödesenheter. Du kan autoskala en händelsehubb genom att aktivera automatisk blåsning, vilket automatiskt skalar dataflödesenheterna baserat på trafik, upp till ett konfigurerat maxvärde.

Stream Analytics

För Stream Analytics mäts de databehandlingsresurser som allokerats till ett jobb i strömningsenheter. Stream Analytics-jobb skalas bäst om jobbet kan parallelliseras. På så sätt kan Stream Analytics distribuera jobbet över flera beräkningsnoder.

För Event Hubs-indata använder du nyckelordet PARTITION BY för att partitionering av Stream Analytics-jobbet. Data delas in i delmängder baserat på Event Hubs-partitionerna.

Fönsterfunktioner och tidsmässiga kopplingar kräver ytterligare SU. Använd när det är möjligt PARTITION BY så att varje partition bearbetas separat. Mer information finns i Förstå och justera strömningsenheter.

Om det inte går att parallellisera hela Stream Analytics-jobbet kan du försöka dela upp jobbet i flera steg, med början i ett eller flera parallella steg. På så sätt kan de första stegen köras parallellt. I den här referensarkitekturen kan du till exempel:

  • Steg 1 och 2 är enkla SELECT instruktioner som väljer poster inom en enda partition.
  • Steg 3 utför en partitionerad koppling mellan två indataströmmar. Det här steget utnyttjar det faktum att matchande poster delar samma partitionsnyckel och därför garanteras att ha samma partitions-ID i varje indataström.
  • Steg 4 aggregeras över alla partitioner. Det här steget kan inte parallelliseras.

Använd Stream Analytics-jobbdiagrammet för att se hur många partitioner som tilldelas till varje steg i jobbet. Följande diagram visar jobbdiagrammet för den här referensarkitekturen:

Diagram som visar Stream Analytics-jobb.

Azure Cosmos DB

Dataflödeskapaciteten för Azure Cosmos DB mäts i RU (Request Units ). För att kunna skala en Azure Cosmos DB-container efter 10 000 RU måste du ange en partitionsnyckel när du skapar containern och inkludera partitionsnyckeln i varje dokument.

I den här referensarkitekturen skapas nya dokument bara en gång per minut (det hoppande fönstrets intervall), så dataflödeskraven är ganska låga. Därför behöver du inte tilldela en partitionsnyckel i det här scenariot.

Övervakning

Med alla dataströmbearbetningslösningar är det viktigt att övervaka systemets prestanda och hälsa. Azure Monitor samlar in mått och diagnostikloggar för De Azure-tjänster som används i arkitekturen. Azure Monitor är inbyggt i Azure-plattformen och kräver ingen ytterligare kod i ditt program.

Någon av följande varningssignaler indikerar att du bör skala ut relevant Azure-resurs:

  • Event Hubs begränsar begäranden eller ligger nära den dagliga meddelandekvoten.
  • Stream Analytics-jobbet använder konsekvent mer än 80 % av allokerade streamingenheter (SU).
  • Azure Cosmos DB börjar begränsa begäranden.

Referensarkitekturen innehåller en anpassad instrumentpanel som distribueras till Azure-portalen. När du har distribuerat arkitekturen kan du visa instrumentpanelen genom att öppna Azure-portalen och välja TaxiRidesDashboard från listan över instrumentpaneler. Mer information om hur du skapar och distribuerar anpassade instrumentpaneler i Azure-portalen finns i Skapa Azure-instrumentpaneler programmatiskt.

Följande bild visar instrumentpanelen efter att Stream Analytics-jobbet kördes i ungefär en timme.

Skärmbild av instrumentpanelen Taxi Rides

Panelen längst ned till vänster visar att SU-förbrukningen för Stream Analytics-jobbet ökar under de första 15 minuterna och sedan planar ut. Det här är ett typiskt mönster när jobbet når ett stabilt tillstånd.

Observera att Event Hubs begränsar begäranden, som visas i den övre högra panelen. En tillfällig begränsningsbegäran är inte ett problem eftersom Event Hubs-klient-SDK:t automatiskt försöker igen när den får ett begränsningsfel. Men om du ser konsekventa begränsningsfel innebär det att händelsehubben behöver fler dataflödesenheter. I följande diagram visas en testkörning med funktionen För automatisk blåsning av Event Hubs, som automatiskt skalar ut dataflödesenheterna efter behov.

Skärmbild av automatisk skalning av Event Hubs.

Automatisk blåsning aktiverades vid ungefär 06:35-märket. Du kan se p-minskningen av begränsade begäranden, eftersom Event Hubs automatiskt skalade upp till 3 dataflödesenheter.

Intressant nog hade detta sidoeffekten att öka SU-användningen i Stream Analytics-jobbet. Genom att begränsa minskade Event Hubs artificiellt inmatningshastigheten för Stream Analytics-jobbet. Det är faktiskt vanligt att lösa en prestandaflaskhals avslöjar en annan. I det här fallet löste allokeringen av ytterligare SU för Stream Analytics-jobbet problemet.

Kostnadsoptimering

Kostnadsoptimering handlar om att titta på sätt att minska onödiga utgifter och förbättra drifteffektiviteten. Mer information finns i Översikt över kostnadsoptimeringspelare.

Normalt beräknar du kostnader med hjälp av priskalkylatorn för Azure. Här följer några överväganden för tjänster som används i den här referensarkitekturen.

Azure Stream Analytics

Azure Stream Analytics prissätts med det antal strömningsenheter (0,11 USD/timme) som krävs för att bearbeta data till tjänsten.

Stream Analytics kan vara dyrt om du inte bearbetar data i realtid eller små mängder data. För dessa användningsfall bör du överväga att använda Azure Functions eller Logic Apps för att flytta data från Azure Event Hubs till ett datalager.

Azure Event Hubs och Azure Cosmos DB

Kostnadsöverväganden om Azure Event Hubs och Azure Cosmos DB finns i Kostnadsöverväganden i referensarkitekturen Stream-bearbetning med Azure Databricks .

DevOps

  • Skapa separata resursgrupper för produktions-, utvecklings- och testmiljöer. Med separata resursgrupper blir det enklare att hantera distributioner, ta bort testdistributioner och tilldela åtkomsträttigheter.

  • Använd Azure Resource Manager-mallen för att distribuera Azure-resurserna efter IaC-processen (infrastruktur som kod). Med mallar är det enklare att automatisera distributioner med Hjälp av Azure DevOps Services eller andra CI/CD-lösningar.

  • Placera varje arbetsbelastning i en separat distributionsmall och lagra resurserna i källkontrollsystemen. Du kan distribuera mallarna tillsammans eller individuellt som en del av en CI/CD-process, vilket gör automatiseringsprocessen enklare.

    I den här arkitekturen identifieras Azure Event Hubs, Log Analytics och Azure Cosmos DB som en enda arbetsbelastning. Dessa resurser ingår i en enda ARM-mall.

  • Överväg att mellanlagring av dina arbetsbelastningar. Distribuera till olika steg och kör valideringskontroller i varje steg innan du går vidare till nästa steg. På så sätt kan du push-överföra uppdateringar till dina produktionsmiljöer på ett mycket kontrollerat sätt och minimera oväntade distributionsproblem.

  • Överväg att använda Azure Monitor för att analysera prestanda för din pipeline för dataströmbearbetning. Mer information finns i Övervaka Azure Databricks.

Mer information finns i grundpelare för driftskvalitet i Microsoft Azure Well-Architected Framework.

Distribuera det här scenariot

Följ stegen i GitHub-readme för att distribuera och köra referensimplementeringen.

Du kanske vill granska följande Azure-exempelscenario som visar en specifik lösning med hjälp av några av samma tekniker: