Prestandajusteringsscenario: Händelseströmning med Azure Functions

Den här artikeln beskriver hur ett utvecklingsteam använde mått för att hitta flaskhalsar och förbättra prestanda för ett distribuerat system. Artikeln baseras på faktisk belastningstestning som vi gjorde för ett exempelprogram.

Den här artikeln ingår i en serie. Läs den första delen här.

Scenario:Bearbeta en händelseström med hjälp av Azure Functions.

Diagram över en arkitektur för händelseströmning

I det här scenariot skickar en drönarpark positionsdata i realtid till Azure IoT Hub. En Functions-app tar emot händelserna, transformerar data till GeoJSON-format och skriver transformerade data till Cosmos DB. Cosmos DB har inbyggt stöd för geospatiala dataoch Cosmos DB samlingar kan indexeras för effektiva rumsliga frågor. Ett klientprogram kan till exempel fråga efter alla drönare inom 1 km från en viss plats eller hitta alla drönare inom ett visst område.

Dessa bearbetningskrav är så enkla att de inte kräver en komplett strömbearbetningsmotor. I synnerhet ansluter inte bearbetningen till strömmar, aggregerar data eller bearbetar över tidsfönster. Baserat på dessa krav Azure Functions passar bra för bearbetning av meddelanden. Cosmos DB också skalas för att stödja mycket högt skrivgenomflöde.

Övervaka dataflöde

Det här scenariot är en intressant prestandautmaning. Datahastigheten per enhet är känd, men antalet enheter kan variera. I det här affärsscenariot är kraven på svarstider inte särskilt strikta. Den rapporterade positionen för en drönare behöver bara vara korrekt inom en minut. Med detta sagt måste funktionsappen hålla takten med den genomsnittliga inmatningsfrekvensen över tid.

IoT Hub lagrar meddelanden i en loggström. Inkommande meddelanden läggs till i slutet av dataströmmen. En läsare av dataströmmen – i det här fallet funktionsappen – styr sin egen frekvens av att bläddra i dataströmmen. Frikopplingen av läs- och skrivvägarna gör IoT Hub mycket effektiv, men innebär också att en långsam läsare kan hamna efter. För att identifiera det här tillståndet lade utvecklingsteamet till ett anpassat mått för att mäta meddelandets lateness. Det här måttet registrerar delta mellan när ett meddelande tas emot IoT Hub och när funktionen tar emot meddelandet för bearbetning.

var ticksUTCNow = DateTimeOffset.UtcNow;

// Track whether messages are arriving at the function late.
DateTime? firstMsgEnqueuedTicksUtc = messages[0]?.EnqueuedTimeUtc;
if (firstMsgEnqueuedTicksUtc.HasValue)
{
    CustomTelemetry.TrackMetric(
                        context,
                        "IoTHubMessagesReceivedFreshnessMsec",
                        (ticksUTCNow - firstMsgEnqueuedTicksUtc.Value).TotalMilliseconds);
}

Metoden TrackMetric skriver ett anpassat mått till Application Insights. Information om hur du TrackMetric använder inuti en Azure-funktion finns TrackMetric.

Om funktionen håller jämna resultat med mängden meddelanden bör det här måttet hålla sig i ett lågt stabilt tillstånd. En del svarstider går inte att undvika, så värdet blir aldrig noll. Men om funktionen ligger efter, börjar deltat mellan den tid då den hamnar i enqueued-sekvens och bearbetningstiden att gå upp.

Test 1: Baslinje

Det första belastningstestet visade ett omedelbart problem: Funktionsappen tog konsekvent emot HTTP 429-fel från Cosmos DB, vilket indikerar att Cosmos DB begränsningen av skrivbegäranden.

Graph av Cosmos DB begränsade begäranden

Som svar har teamet skalat Cosmos DB genom att öka antalet RU:er som allokerats för samlingen, men felen fortsätter. Detta verkade konstigt eftersom beräkningen av kuverten visade att Cosmos DB inte hade några problem med att hålla koll på mängden skrivbegäranden.

Senare samma dag skickade en av utvecklarna följande e-postmeddelande till teamet:

Jag har tittat Cosmos DB på den varma vägen. Det finns en sak som jag inte förstår. Partitionsnyckeln är deliveryId, men vi skickar inte deliveryId till Cosmos DB. Saknar jag något?

Det var ledtråden. När vi tittar på partitionens heat map visade det sig att alla dokument landade på samma partition.

Graph av Cosmos DB partitions heat map

Det du vill se i den värmekartan är en jämn fördelning över alla partitioner. I det här fallet hjälpte det inte att lägga till RU:er eftersom alla dokument skrevs till samma partition. Problemet visade sig vara ett fel i koden. Även Cosmos DB samling hade en partitionsnyckel så inkluderar inte Azure-funktionen partitionsnyckeln i dokumentet. Mer information om partitionens heat map finns i Determine the throughput distribution across partitions.

Test 2: Åtgärda partitioneringsproblem

När teamet distribuerade en kodkorrigering och körde testet på nytt stoppade Cosmos DB begränsningen. Ett tag såg allt bra ut. Vid en viss inläsning visade telemetrin dock att funktionen skrev färre dokument än den borde. I följande diagram visas meddelanden som tas emot från IoT Hub dokument som skrivs till Cosmos DB. Den gula raden är antalet meddelanden som tas emot per batch och den gröna är antalet dokument som skrivs per batch. Dessa bör vara proportionella. I stället sjunker antalet skrivåtgärder för databaser per batch avsevärt vid cirka 07:30.

Graph bort ignorerade meddelanden

I nästa diagram visas svarstiden mellan när ett meddelande tas emot IoT Hub från en enhet och när funktionsappen bearbetar meddelandet. Du kan se att samtidigt ökar latenstiden dramatiskt, planar ut och minskar.

Graph av meddelandets lateness

Anledningen till att värdet når sin topp vid 5 minuter och sedan sjunker till noll är att funktionsappen tar bort meddelanden som är mer än 5 minuter försenade:

foreach (var message in messages)
{
    // Drop stale messages,
    if (message.EnqueuedTimeUtc < cutoffTime)
    {
        log.Info($"Dropping late message batch. Enqueued time = {message.EnqueuedTimeUtc}, Cutoff = {cutoffTime}");
        droppedMessages++;
        continue;
    }

Du kan se detta i diagrammet när måttet lateness sjunker tillbaka till noll. Under tiden har data gått förlorade eftersom funktionen slängde bort meddelanden.

Vad hände? För det här specifika belastningstestet hade Cosmos DB-samlingen RU:er att spara, så flaskhalsen fanns inte i databasen. Problemet låg i stället i meddelandebearbetningsloopen. Enkelt uttryckt skrev funktionen inte dokument tillräckligt snabbt för att hålla koll på den inkommande mängden meddelanden. Med tiden har den fallit längre och längre efter.

Test 3: Parallella skrivningar

Om tiden det tar att bearbeta ett meddelande är flaskhalsen är en lösning att bearbeta fler meddelanden parallellt. I det här scenariot:

  • Öka antalet IoT Hub partitioner. Varje IoT Hub tilldelas en funktionsinstans i taget, så vi förväntar oss att dataflödet skalas linjärt med antalet partitioner.
  • Parallellisera dokument skriver i funktionen.

För att utforska det andra alternativet ändrade teamet funktionen för att stödja parallella skrivningar. Den ursprungliga versionen av funktionen använde Cosmos DB utdatabindningen. Den optimerade versionen anropar Cosmos DB klienten direkt och utför skrivningar parallellt med Hjälp av Task.WhenAll:

private async Task<(long documentsUpserted,
                    long droppedMessages,
                    long cosmosDbTotalMilliseconds)>
                ProcessMessagesFromEventHub(
                    int taskCount,
                    int numberOfDocumentsToUpsertPerTask,
                    EventData[] messages,
                    TraceWriter log)
{
    DateTimeOffset cutoffTime = DateTimeOffset.UtcNow.AddMinutes(-5);

    var tasks = new List<Task>();

    for (var i = 0; i < taskCount; i++)
    {
        var docsToUpsert = messages
                            .Skip(i * numberOfDocumentsToUpsertPerTask)
                            .Take(numberOfDocumentsToUpsertPerTask);
        // client will attempt to create connections to the data
        // nodes on Cosmos db's clusters on a range of port numbers
        tasks.Add(UpsertDocuments(i, docsToUpsert, cutoffTime, log));
    }

    await Task.WhenAll(tasks);

    return (this.UpsertedDocuments,
            this.DroppedMessages,
            this.CosmosDbTotalMilliseconds);
}

Observera att rasvillkor är möjliga med metoden . Anta att två meddelanden från samma drönare anländer i samma meddelandebatch. Genom att skriva dem parallellt kan det tidigare meddelandet skriva över det senare meddelandet. I det här scenariot kan programmet tolerera att ett enstaka meddelande går förlorat. Drönare skickar nya positionsdata var 5:e sekund, så att Cosmos DB uppdateras kontinuerligt. I andra scenarier kan det dock vara viktigt att bearbeta meddelanden strikt i ordning.

När du har distribuerat den här kodändringen kunde programmet mata in fler än 2 500 begäranden per sekund med hjälp av en IoT Hub med 32 partitioner.

Överväganden på klientsidan

Den övergripande klientupplevelsen kan minskas av aggressiv parallellisering på serversidan. Överväg att Azure Cosmos DB massutförande bibliotek (visas inte i den här implementeringen), vilket avsevärt minskar de beräkningsresurser på klientsidan som behövs för att mätta dataflödet som allokerats till en Cosmos DB container. Ett enda trådat program som skriver data med hjälp av massimport-API:et uppnår nästan tio gånger större skrivdataflöde jämfört med ett flertrådat program som skriver data parallellt medan klientdatorns processor blir mättad.

Sammanfattning

I det här scenariot identifierades följande flaskhalsar:

  • Partition för snabbskrivning på grund av att partitionsnyckelvärdet saknas i dokumenten som skrivs.
  • Skriva dokument i serie per IoT Hub partition.

För att diagnostisera dessa problem förlitade sig utvecklingsteamet på följande mått:

  • Begränsade begäranden i Cosmos DB.
  • Partitions heat map – Maximalt antal förbrukade RU:er per partition.
  • Mottagna meddelanden jämfört med dokument som skapats.
  • Meddelandesendhet.

Nästa steg

Granska prestanda antimönster