Deze referentiearchitectuur toont een end-to-end pijplijn voor stroomverwerking. De pijplijn neemt gegevens op uit twee bronnen, correleert records in de twee stromen en berekent een doorstromend gemiddelde in een tijdvenster. De resultaten worden opgeslagen voor verdere analyse.
Er is een referentie-implementatie voor deze architectuur beschikbaar op GitHub.

Scenario: een taxibedrijf verzamelt gegevens over elke taxirit. Voor dit scenario gaan we ervan uit dat er twee afzonderlijke apparaten zijn die gegevens verzenden. De taxi heeft een meter die informatie verzendt over elke rit, de — duur, de afstand en de locaties voor ophalen en afhalen. Een afzonderlijk apparaat accepteert betalingen van klanten en verzendt gegevens over tarieven. Het taxibedrijf wil de gemiddelde fooi per mijl in realtime berekenen om trends te herkennen.
Architectuur
De architectuur bestaat uit de volgende onderdelen.
Gegevensbronnen. In deze architectuur zijn er twee gegevensbronnen die in realtime gegevensstromen genereren. De eerste stroom bevat ritgegevens en de tweede bevat ritgegevens. De referentiearchitectuur bevat een gesimuleerde gegevensgenerator die uit een set statische bestanden leest en de gegevens naar de Event Hubs. In een echte toepassing zouden de gegevensbronnen apparaten zijn die in de taxi's zijn geïnstalleerd.
Azure Event Hubs. Event Hubs is een service voor het opnemen van gebeurtenissen. Deze architectuur maakt gebruik van twee Event Hub-exemplaren, één voor elke gegevensbron. Elke gegevensbron verzendt een gegevensstroom naar de gekoppelde Event Hub.
Azure Stream Analytics. Stream Analytics is een engine voor gebeurtenisverwerking. Een Stream Analytics leest de gegevensstromen van de twee Event Hubs en voert de stroomverwerking uit.
Cosmos DB. De uitvoer van de Stream Analytics taak is een reeks records die als JSON-documenten worden geschreven naar een Cosmos DB-documentdatabase.
Microsoft Power BI. Power BI is een suite met hulpprogramma's voor bedrijfsanalyse voor het analyseren van gegevens voor zakelijke inzichten. In deze architectuur worden de gegevens uit de Cosmos DB. Hierdoor kunnen gebruikers de volledige set historische gegevens analyseren die zijn verzameld. U kunt de resultaten ook rechtstreeks vanuit het Stream Analytics naar Power BI voor een realtime weergave van de gegevens. Zie Realtime streaming in Power BI voor meer informatie.
Azure Monitor. Azure Monitor verzamelt metrische prestatiegegevens over de Azure-services die in de oplossing zijn geïmplementeerd. Door deze in een dashboard te visualiseren, krijgt u inzicht in de status van de oplossing.
Gegevensopname
Voor het simuleren van een gegevensbron maakt deze referentiearchitectuur gebruik van de gegevensset New York City Taxi Data [1]. Deze gegevensset bevat gegevens over taxiritten in New York City gedurende een periode van vier jaar (2010 – 2013). Het bevat twee soorten records: ritgegevens en ritgegevens. Ritgegevens omvatten reisduur, reisafstand en afhaal- en afhaallocatie. De tariefgegevens omvatten de tarieven, belasting en fooien. Algemene velden in beide recordtypen zijn medaillenummer, hacklicentie en leverancier-id. Samen identificeren deze drie velden een taxi plus een stuurprogramma op unieke manier. De gegevens worden opgeslagen in CSV-indeling.
[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois in Urbana-Moetaign. https://doi.org/10.13012/J8PN93H8
De gegevensgenerator is een .NET Core-toepassing die de records leest en verzendt naar Azure Event Hubs. De generator verzendt ritgegevens in JSON-indeling en ritgegevens in CSV-indeling.
Event Hubs partities gebruikt om de gegevens te segmenteren. Met partities kan een consument elke partitie parallel lezen. Wanneer u gegevens naar een Event Hubs, kunt u de partitiesleutel expliciet opgeven. Anders worden records toegewezen aan partities op round robin-manier.
In dit specifieke scenario moeten gegevensritten en ritgegevens eindigen op dezelfde partitie-id voor een taxi. Hierdoor kunnen Stream Analytics mate van parallelle activiteit toepassen wanneer deze de twee stromen correleert. Een record in partitie n van de ritgegevens komt overeen met een record in partitie n van de ritgegevens.

In de gegevensgenerator heeft het algemene gegevensmodel voor beide recordtypen een eigenschap die de PartitionKey samenvoeging is Medallion van , en HackLicense VendorId .
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Deze eigenschap wordt gebruikt om een expliciete partitiesleutel op te geven bij het verzenden naar Event Hubs:
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Streamverwerking
De stroomverwerkings job wordt gedefinieerd met behulp van een SQL query met verschillende afzonderlijke stappen. In de eerste twee stappen worden alleen records uit de twee invoerstromen geselecteerd.
WITH
Step1 AS (
SELECT PartitionId,
TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
VendorId,
TRY_CAST(PickupTime AS datetime) AS PickupTime,
TripDistanceInMiles
FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
SELECT PartitionId,
medallion AS Medallion,
hack_license AS HackLicense,
vendor_id AS VendorId,
TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
tip_amount AS TipAmount
FROM [TaxiFare] PARTITION BY PartitionId
),
De volgende stap voegt de twee invoerstromen samen om overeenkomende records uit elke stroom te selecteren.
Step3 AS (
SELECT tr.TripDistanceInMiles,
tf.TipAmount
FROM [Step1] tr
PARTITION BY PartitionId
JOIN [Step2] tf PARTITION BY PartitionId
ON tr.PartitionId = tf.PartitionId
AND tr.PickupTime = tf.PickupTime
AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)
Met deze query worden records in een set velden die overeenkomende records ( en ) uniek PartitionId PickupTime identificeren.
Notitie
We willen dat TaxiRide de streams en worden samengevoegd door de unieke combinatie van , en TaxiFare Medallion HackLicense VendorId PickupTime . In dit geval de dekt de velden , en , maar dit moet niet worden beschouwd als in het PartitionId Medallion algemeen het HackLicense VendorId geval.
In Stream Analytics zijn joins tijdelijk, wat betekent dat records worden samengevoegd binnen een bepaald tijdvenster. Anders moet de taak mogelijk voor onbepaalde tijd wachten op een overeenkomst. De functie DATEDIFF geeft aan hoe ver twee overeenkomende records op tijd kunnen worden gescheiden voor een overeenkomst.
De laatste stap in de taak berekent de gemiddelde fooi per mijl, gegroepeerd op een hoppingvenster van 5 minuten.
SELECT System.Timestamp AS WindowTime,
SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
INTO [TaxiDrain]
FROM [Step3] tr
GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))
Stream Analytics biedt verschillende vensterfuncties. Een hoppingvenster gaat vooruit in de tijd met een vaste periode, in dit geval 1 minuut per hop. Het resultaat is het berekenen van een bewegend gemiddelde in de afgelopen 5 minuten.
In de architectuur die hier wordt weergegeven, worden alleen de resultaten van Stream Analytics taak opgeslagen in Cosmos DB. Voor een big data scenario kunt u ook Event Hubs Capture gebruiken om de onbewerkte gebeurtenisgegevens op te slaan in Azure Blob Storage. Door de onbewerkte gegevens te bewaren, kunt u op een later tijdstip batchquery's uitvoeren op uw historische gegevens om nieuwe inzichten uit de gegevens af te leiden.
Schaalbaarheidsoverwegingen
Event Hubs
De doorvoercapaciteit van Event Hubs wordt gemeten in doorvoereenheden. U kunt een Event Hub automatisch schalen door automatisch opschalen in te stellen,waarmee de doorvoereenheden automatisch worden geschaald op basis van verkeer, tot een geconfigureerd maximum.
Stream Analytics
Voor Stream Analytics worden de rekenbronnen die aan een taak zijn toegewezen, gemeten in streaming-eenheden. Stream Analytics taken kunnen het beste worden geschaald als de taak kan worden ge parallelliseerd. Op die manier Stream Analytics taak verdelen over meerdere rekenknooppunten.
Voor Event Hubs invoer gebruikt u het PARTITION BY trefwoord om de taak Stream Analytics partitioneren. De gegevens worden onderverdeeld in subsets op basis van de Event Hubs partities.
Voor vensterfuncties en tijdelijke joins zijn extra SU's vereist. Gebruik, indien mogelijk, PARTITION BY zodat elke partitie afzonderlijk wordt verwerkt. Zie Streaming-eenheden begrijpen en aanpassen.
Als het niet mogelijk is om de volledige Stream Analytics te parallelliseren, probeert u de taak op te breken in meerdere stappen, te beginnen met een of meer parallelle stappen. Op die manier kunnen de eerste stappen parallel worden uitgevoerd. In deze referentiearchitectuur bijvoorbeeld:
- Stap 1 en 2 zijn eenvoudige
SELECTinstructies die records binnen één partitie selecteren. - Stap 3 voert een gepartitiesteerde join uit over twee invoerstromen. Deze stap maakt gebruik van het feit dat overeenkomende records dezelfde partitiesleutel delen en dus gegarandeerd dezelfde partitie-id hebben in elke invoerstroom.
- Stap 4 aggregeert alle partities. Deze stap kan niet worden ge parallelliseerd.
Gebruik het Stream Analytics taakdiagram om te zien hoeveel partities aan elke stap in de taak zijn toegewezen. In het volgende diagram ziet u het taakdiagram voor deze referentiearchitectuur:

Cosmos DB
De doorvoercapaciteit voor Cosmos DB wordt gemeten in aanvraageenheden (RU's). Als u een Cosmos DB container van meer dan 10.000 RU wilt schalen, moet u een partitiesleutel opgeven wanneer u de container maakt en de partitiesleutel in elk document opnemen.
In deze referentiearchitectuur worden nieuwe documenten slechts één keer per minuut gemaakt (het hoppingvensterinterval), waardoor de doorvoervereisten vrij laag zijn. Daarom is het niet nodig om in dit scenario een partitiesleutel toe te wijzen.
Bewakingsoverwegingen
Bij elke oplossing voor stroomverwerking is het belangrijk om de prestaties en status van het systeem te bewaken. Azure Monitor verzamelt metrische gegevens en diagnostische logboeken voor de Azure-services die in de architectuur worden gebruikt. Azure Monitor is ingebouwd in het Azure-platform en vereist geen aanvullende code in uw toepassing.
Elk van de volgende waarschuwingssignalen geeft aan dat u de relevante Azure-resource moet uitschalen:
- Event Hubs beperkt aanvragen of ligt dicht bij het dagelijkse berichtquotum.
- De Stream Analytics gebruikt consistent meer dan 80% van de toegewezen streaming-eenheden (SU's).
- Cosmos DB worden aanvragen beperkt.
De referentiearchitectuur bevat een aangepast dashboard dat wordt geïmplementeerd op de Azure Portal. Nadat u de architectuur hebt geïmplementeerd, kunt u het dashboard weergeven door de Azure Portal te openen en te selecteren in de lijst TaxiRidesDashboard met dashboards. Zie Programmatisch Azure-dashboards maken voor meer informatie over het maken en implementeren van aangepaste dashboardsin de Azure Portal.
In de volgende afbeelding ziet u het dashboard nadat Stream Analytics taak ongeveer een uur is uitgevoerd.

In het deelvenster linksbeneden ziet u dat het SU-verbruik voor de Stream Analytics taak gedurende de eerste 15 minuten wordt uitgeschakeld. Dit is een typisch patroon omdat de taak een stabiele status bereikt.
U ziet Event Hubs beperkingsaanvragen zijn, zoals wordt weergegeven in het rechterbovenpaneel. Een aanvraag die af en toe wordt beperkt, is geen probleem, omdat de Event Hubs client-SDK automatisch opnieuw een aanvraag indient wanneer er een beperkingsfout wordt ontvangen. Als er echter consistente beperkingsfouten optreden, betekent dit dat de Event Hub meer doorvoereenheden nodig heeft. In het volgende diagram ziet u een testuitvoer met behulp Event Hubs functie automatisch opschalen, waarmee de doorvoereenheden automatisch worden geschaald wanneer dat nodig is.

Automatisch opbouwen is ingeschakeld om ongeveer 06:35 uur. U kunt de p-daling in beperkt aanvragen zien, omdat Event Hubs automatisch omhoog wordt geschaald tot 3 doorvoereenheden.
Interessant is dat dit het neveneffect heeft gehad van het verhogen van het SU-gebruik in de Stream Analytics taak. Door bandbreedtebeperking werd Event Hubs de opnamesnelheid voor de Stream Analytics taak kunstmatig verkleind. Het is eigenlijk gebruikelijk dat het oplossen van een prestatieknelpunt een ander knelpunt aan het licht komt. In dit geval is het probleem opgelost door extra su-Stream Analytics toe te Stream Analytics taak.
Kostenoverwegingen
Gebruik de Azure-prijscalculator om een schatting van de kosten te maken. Hier zijn enkele overwegingen voor services die worden gebruikt in deze referentiearchitectuur.
Azure Stream Analytics
Azure Stream Analytics is geprijsd op het aantal streaming-eenheden ($ 0,11/uur) dat is vereist voor het verwerken van de gegevens in de service.
Stream Analytics kunnen duur zijn als u de gegevens niet in realtime of in kleine hoeveelheden gegevens verwerkt. Voor deze gebruiksgevallen kunt u Azure Functions of Logic Apps gebruiken om gegevens van Azure Event Hubs naar een gegevensopslag te verplaatsen.
Azure Event Hubs en Azure Cosmos DB
Zie Kostenoverwegingen voor Azure Event Hubs en Cosmos DB Stream-verwerking met Azure Databricks referentiearchitectuur voor kostenoverwegingen.
De oplossing implementeren
Als u de referentie-implementatie wilt implementeren en uitvoeren, volgt u de stappen in GitHub readme.
DevOps overwegingen
Maak afzonderlijke resourcegroepen voor productie-, ontwikkelings- en testomgevingen. Met afzonderlijke resourcegroepen kunt u eenvoudiger implementaties beheren, testimplementaties verwijderen en toegangsrechten verlenen.
Gebruik Azure Resource Manager om de Azure-resources te implementeren volgens het IaC-proces (Infrastructure as Code). Met sjablonen is het eenvoudiger om implementaties te automatiseren met Behulp van Azure DevOps Servicesof andere CI/CD-oplossingen.
Plaats elke workload in een afzonderlijke implementatiesjabloon en sla de resources op in broncodebeheersystemen. U kunt de sjablonen samen of afzonderlijk implementeren als onderdeel van een CI/CD-proces, waardoor het automatiseringsproces eenvoudiger wordt.
In deze architectuur worden Azure Event Hubs, Log Analytics en Cosmos DB geïdentificeerd als één workload. Deze resources zijn opgenomen in één ARM-sjabloon.
U kunt uw workloads faseren. Implementeer in verschillende fasen en voer validatiecontroles uit in elke fase voordat u naar de volgende fase gaat. Op die manier kunt u updates naar uw productieomgevingen pushen op een zeer gecontroleerde manier en onverwachte implementatieproblemen minimaliseren.
Overweeg het gebruik Azure Monitor om de prestaties van uw pijplijn voor stroomverwerking te analyseren. Zie Bewakingsgegevens voor Azure Databricks.
Zie de sectie DevOps in Microsoft Azure Well-Architected Framework voor meer informatie.
Gerelateerde resources
U kunt de volgende Azure-voorbeeldscenario's bekijken waarin specifieke oplossingen worden gedemonstreerd met behulp van een aantal van dezelfde technologieën: