Scenario voor het afstemmen van prestaties: Gebeurtenisstreaming met Azure Functions

In dit artikel wordt beschreven hoe een ontwikkelingsteam metrische gegevens gebruikt om knelpunten te vinden en de prestaties van een gedistribueerd systeem te verbeteren. Het artikel is gebaseerd op de werkelijke belastingstests die we hebben uitgevoerd voor een voorbeeldtoepassing.

Dit artikel maakt deel uit van een reeks. Lees het eerste deel hier.

Scenario: Een stroom gebeurtenissen verwerken met behulp van Azure Functions.

Diagram van een architectuur voor gebeurtenisstreaming

In dit scenario verzendt een vloot drones positiegegevens in realtime naar Azure IoT Hub. Een Functions-app ontvangt de gebeurtenissen, transformeert de gegevens in GeoJSON-indeling en schrijft de getransformeerde gegevens naar Cosmos DB. Cosmos DB biedt native ondersteuning voor georuimtelijkegegevens en Cosmos DB verzamelingen kunnen worden geïndexeerd voor efficiënte ruimtelijke query's. Een clienttoepassing kan bijvoorbeeld een query uitvoeren voor alle drones binnen 1 km van een bepaalde locatie of alle drones binnen een bepaald gebied vinden.

Deze verwerkingsvereisten zijn eenvoudig genoeg, zodat ze geen volledige stroomverwerkingsen engine nodig hebben. In het bijzonder worden tijdens de verwerking geen stromen samengevoegd, gegevens samengevoegd of verwerkt in tijdvensters. Op basis van deze vereisten is Azure Functions geschikt voor het verwerken van de berichten. Cosmos DB kunnen ook worden geschaald om een zeer hoge schrijfdoorvoer te ondersteunen.

Doorvoer bewaken

Dit scenario vormt een interessante prestatie-uitdaging. De gegevenssnelheid per apparaat is bekend, maar het aantal apparaten kan fluctueren. Voor dit bedrijfsscenario zijn de latentievereisten niet erg streng. De gerapporteerde positie van een drone hoeft slechts binnen een minuut nauwkeurig te zijn. Dat gezegd hebbende, moet de functie-app het gemiddelde opnamepercentage gedurende een periode bij houden.

IoT Hub slaat berichten op in een logboekstroom. Binnenkomende berichten worden toegevoegd aan de staart van de stroom. Als lezer van de stream bepaalt de functie-app in dit geval de eigen snelheid waarmee de — — stroom wordt doorlopen. Deze ontkoppeling van de lees- en schrijfpaden maakt IoT Hub efficiënt, maar betekent ook dat een trage lezer achter kan komen te zitten. Om deze voorwaarde te detecteren, heeft het ontwikkelteam een aangepaste metrische waarde toegevoegd om de vertraging van berichten te meten. Deze metrische gegevens registreert de verschillen tussen wanneer een bericht bij IoT Hub binnenkomt en wanneer de functie het bericht ontvangt voor verwerking.

var ticksUTCNow = DateTimeOffset.UtcNow;

// Track whether messages are arriving at the function late.
DateTime? firstMsgEnqueuedTicksUtc = messages[0]?.EnqueuedTimeUtc;
if (firstMsgEnqueuedTicksUtc.HasValue)
{
    CustomTelemetry.TrackMetric(
                        context,
                        "IoTHubMessagesReceivedFreshnessMsec",
                        (ticksUTCNow - firstMsgEnqueuedTicksUtc.Value).TotalMilliseconds);
}

De TrackMetric methode schrijft aangepaste metrische gegevens naar Application Insights. Zie Aangepaste telemetrie in C#-functie voor meer informatie over het gebruik van in een TrackMetric Azure-functie.

Als de functie het aantal berichten bijhoudt, moet deze metrische waarde een lage stabiele status hebben. Enige latentie is onvermijdelijk, dus de waarde zal nooit nul zijn. Maar als de functie achterop raakt, neemt de delta tussen de enqueuedtijd en de verwerkingstijd toe.

Test 1: Basislijn

De eerste belastingstest toont een direct probleem: De functie-app heeft consistent HTTP 429-fouten ontvangen van Cosmos DB, wat aangeeft dat Cosmos DB de schrijfaanvragen heeft beperking.

Graph van Cosmos DB aanvragen

Als antwoord schaalde het team Cosmos DB door het aantal TOEGEWEZEN RUS's voor de verzameling te verhogen, maar de fouten werden voortgezet. Dit vond het vreemd, omdat hun berekening van de back-of-envelop aangetoond dat Cosmos DB geen probleem mag hebben met het bijhouden van het volume aan schrijfaanvragen.

Later die dag heeft een van de ontwikkelaars de volgende e-mail naar het team verzonden:

Ik heb gekeken Cosmos DB naar het warme pad. Er is één ding dat ik niet begrijp. De partitiesleutel is deliveryId, maar we verzenden geen deliveryId naar Cosmos DB. Ontbreekt er iets?

Dat was de aanwijzing. Als we naar de partitie heatmap kijken, blijkt dat alle documenten op dezelfde partitie zijn geland.

Graph van Cosmos DB partitie-heatmap

Wat u in de heatmap wilt zien, is een gelijkmatige verdeling over alle partities. Omdat in dit geval elk document naar dezelfde partitie werd geschreven, heeft het toevoegen van DEE's niet geholpen. Het probleem bleek een bug in de code te zijn. Hoewel de Cosmos DB een partitiesleutel had, bevat de Azure-functie de partitiesleutel niet daadwerkelijk in het document. Zie De doorvoerdistributie over partities bepalen voor meer informatie over de partitie heatmap.

Test 2: Partitioneringsprobleem oplossen

Toen het team een codefix heeft geïmplementeerd en de test opnieuw heeft uitgevoerd, is Cosmos DB beperking gestopt. Al een tijdje zag alles er goed uit. Maar bij een bepaalde belasting heeft telemetrie aangetoond dat de functie minder documenten schrijft dan nodig is. De volgende grafiek toont berichten die worden ontvangen van IoT Hub in vergelijking met documenten die naar Cosmos DB. De gele lijn is het aantal berichten dat per batch wordt ontvangen en de groene lijn is het aantal documenten dat per batch is geschreven. Deze moeten proportioneel zijn. In plaats daarvan daalt het aantal schrijfbewerkingen per batch aanzienlijk om ongeveer 07:30.

Graph van uitgevallen berichten

In het volgende diagram ziet u de latentie tussen het moment waarop een bericht op IoT Hub apparaat binnenkomt en wanneer de functie-app dat bericht verwerkt. U kunt zien dat de vertraging op hetzelfde moment aanzienlijk toeneemt, afneemt en afneemt.

Graph de laatste tijd van berichten

De reden dat de waarde op 5 minuten piekt en vervolgens daalt naar nul, is omdat de functie-app berichten die meer dan 5 minuten te laat zijn, worden verwijderd:

foreach (var message in messages)
{
    // Drop stale messages,
    if (message.EnqueuedTimeUtc < cutoffTime)
    {
        log.Info($"Dropping late message batch. Enqueued time = {message.EnqueuedTimeUtc}, Cutoff = {cutoffTime}");
        droppedMessages++;
        continue;
    }

U kunt dit in de grafiek zien wanneer de metrische gegevens over de lateheid weer terug zijn op nul. In de tussentijd zijn gegevens verloren gegaan, omdat de functie berichten heeft weggehaald.

Wat is er gebeurd? Voor deze specifieke belastingstest had de Cosmos DB te weinig RUs, waardoor het knelpunt zich niet in de database voordeed. In plaats van dat het probleem zich voordeed in de berichtverwerkingslus. Eenvoudig gezegd, de functie heeft documenten niet snel genoeg geschreven om het inkomende volume aan berichten bij te houden. Na een periode is het steeds verder achterop komen te liggen.

Test 3: Parallelle schrijf schrijfresultaten

Als de tijd voor het verwerken van een bericht het knelpunt is, is één oplossing om meer berichten parallel te verwerken. In dit scenario geldt het volgende:

  • Verhoog het aantal IoT Hub partities. Aan IoT Hub partitie wordt één functie-exemplaar tegelijk toegewezen, dus we verwachten dat de doorvoer lineair wordt geschaald met het aantal partities.
  • Het schrijven van documenten in de functie parallelliseren.

Om de tweede optie te verkennen, heeft het team de functie gewijzigd om parallelle schrijffuncties te ondersteunen. De oorspronkelijke versie van de functie heeft de Cosmos DB uitvoerbinding gebruikt. Met de geoptimaliseerde versie wordt de Cosmos DB client aanroepen en worden de schrijf schrijft parallel uitgevoerd met behulp van Task.WhenAll:

private async Task<(long documentsUpserted,
                    long droppedMessages,
                    long cosmosDbTotalMilliseconds)>
                ProcessMessagesFromEventHub(
                    int taskCount,
                    int numberOfDocumentsToUpsertPerTask,
                    EventData[] messages,
                    TraceWriter log)
{
    DateTimeOffset cutoffTime = DateTimeOffset.UtcNow.AddMinutes(-5);

    var tasks = new List<Task>();

    for (var i = 0; i < taskCount; i++)
    {
        var docsToUpsert = messages
                            .Skip(i * numberOfDocumentsToUpsertPerTask)
                            .Take(numberOfDocumentsToUpsertPerTask);
        // client will attempt to create connections to the data
        // nodes on Cosmos db's clusters on a range of port numbers
        tasks.Add(UpsertDocuments(i, docsToUpsert, cutoffTime, log));
    }

    await Task.WhenAll(tasks);

    return (this.UpsertedDocuments,
            this.DroppedMessages,
            this.CosmosDbTotalMilliseconds);
}

Houd er rekening mee dat racevoorwaarden mogelijk zijn met benadering. Stel dat twee berichten van dezelfde drone in dezelfde batch berichten binnenkomen. Door ze parallel te schrijven, kan het eerdere bericht het latere bericht overschrijven. Voor dit specifieke scenario kan de toepassing het verlies van een incidenteel bericht tolereren. Drones verzenden elke vijf seconden nieuwe positiegegevens, zodat de gegevens in Cosmos DB voortdurend worden bijgewerkt. In andere scenario's kan het echter belangrijk zijn om berichten strikt op volgorde te verwerken.

Na de implementatie van deze codewijziging kon de toepassing meer dan 2500 aanvragen per seconde opnemen met behulp van een IoT Hub met 32 partities.

Overweging aan de clientzijde

De algehele clientervaring kan worden verminderd door agressieve parallellisatie aan de serverzijde. Overweeg gebruik te maken Azure Cosmos DB bulkuitvoeringsbibliotheek (niet weergegeven in deze implementatie), waardoor de rekenresources aan de clientzijde aanzienlijk worden verminderd die nodig zijn om de doorvoer te verzadigen die aan een Cosmos DB container is toegewezen. Een toepassing met één thread die gegevens schrijft met behulp van de API voor bulkimport, realiseert een bijna tien keer hogere schrijfdoorvoer in vergelijking met een toepassing met meerdere threads die gegevens parallel schrijft terwijl de CPU van de clientmachine wordt vernietigd.

Samenvatting

Voor dit scenario zijn de volgende knelpunten geïdentificeerd:

  • Hot write-partitie, vanwege een ontbrekende partitiesleutelwaarde in de documenten die worden geschreven.
  • Het schrijven van documenten in serie per IoT Hub partitie.

Om deze problemen vast te stellen, vertrouwde het ontwikkelteam op de volgende metrische gegevens:

  • Beperkt aantal aanvragen in Cosmos DB.
  • Partitie heatmap — Maximum aantal verbruikte RUs per partitie.
  • Ontvangen berichten versus gemaakte documenten.
  • Laatheid van bericht.

Volgende stappen

Antipatroon voor prestaties controleren