Dostrajanie wydajności — przesyłanie strumieniowe zdarzeń

Azure Functions
Azure IoT Hub
Azure Cosmos DB

W tym artykule opisano sposób, w jaki zespół programistyczny używał metryk do znajdowania wąskich gardeł i poprawiania wydajności systemu rozproszonego. Artykuł jest oparty na rzeczywistym testowaniu obciążenia, które wykonaliśmy dla przykładowej aplikacji.

Ten artykuł jest częścią serii. Przeczytaj pierwszą część tutaj.

Scenariusz: przetwarzanie strumienia zdarzeń przy użyciu Azure Functions.

Diagram architektury przesyłania strumieniowego zdarzeń

W tym scenariuszu flota dronów wysyła dane pozycji w czasie rzeczywistym do Azure IoT Hub. Aplikacja usługi Functions odbiera zdarzenia, przekształca dane w format GeoJSON i zapisuje przekształcone dane w usłudze Azure Cosmos DB. Usługa Azure Cosmos DB ma natywną obsługę danych geoprzestrzennych, a kolekcje usługi Azure Cosmos DB można indeksować pod kątem wydajnych zapytań przestrzennych. Na przykład aplikacja kliencka może wykonywać zapytania dotyczące wszystkich dronów w odległości 1 km od danej lokalizacji lub znajdować wszystkie drony w określonym obszarze.

Te wymagania dotyczące przetwarzania są na tyle proste, że nie wymagają pełnego aparatu przetwarzania strumieniowego. W szczególności przetwarzanie nie łączy strumieni, agregacji danych ani przetwarzania w oknach czasowych. Na podstawie tych wymagań Azure Functions jest dobrym rozwiązaniem do przetwarzania komunikatów. Usługa Azure Cosmos DB może również skalować w celu obsługi bardzo dużej przepływności zapisu.

Monitorowanie przepływności

W tym scenariuszu przedstawiono interesujące wyzwanie dotyczące wydajności. Szybkość danych na urządzenie jest znana, ale liczba urządzeń może się wahać. W tym scenariuszu biznesowym wymagania dotyczące opóźnienia nie są szczególnie rygorystyczne. Zgłoszona pozycja drona musi być dokładna tylko w ciągu minuty. Oznacza to, że aplikacja funkcji musi nadążyć za średnim współczynnikiem pozyskiwania w czasie.

IoT Hub przechowuje komunikaty w strumieniu dziennika. Komunikaty przychodzące są dołączane do końca strumienia. Czytelnik strumienia — w tym przypadku aplikacja funkcji — kontroluje własną szybkość przechodzenia strumienia. Oddzielenie ścieżek odczytu i zapisu sprawia, że IoT Hub bardzo wydajne, ale także oznacza, że powolny czytnik może spaść w tyle. Aby wykryć ten warunek, zespół deweloperów dodał niestandardową metrykę, aby zmierzyć opóźnienie komunikatu. Ta metryka rejestruje różnicę między tym, kiedy komunikat pojawia się IoT Hub, a gdy funkcja odbiera komunikat do przetwarzania.

var ticksUTCNow = DateTimeOffset.UtcNow;

// Track whether messages are arriving at the function late.
DateTime? firstMsgEnqueuedTicksUtc = messages[0]?.EnqueuedTimeUtc;
if (firstMsgEnqueuedTicksUtc.HasValue)
{
    CustomTelemetry.TrackMetric(
                        context,
                        "IoTHubMessagesReceivedFreshnessMsec",
                        (ticksUTCNow - firstMsgEnqueuedTicksUtc.Value).TotalMilliseconds);
}

Metoda TrackMetric zapisuje niestandardową metrykę w usłudze Application Insights. Aby uzyskać informacje na temat używania TrackMetric funkcji platformy Azure, zobacz Niestandardowe dane telemetryczne w funkcji języka C#.

Jeśli funkcja nadąża za ilością komunikatów, ta metryka powinna pozostać w niskim stanie stałym. Pewne opóźnienie jest nieuniknione, więc wartość nigdy nie będzie równa zero. Ale jeśli funkcja spadnie w tyle, różnica między czasem w kolejce i czasem przetwarzania rozpocznie się w górę.

Test 1. Punkt odniesienia

Pierwszy test obciążeniowy wykazał natychmiastowy problem: aplikacja funkcji stale odbierała błędy HTTP 429 z usługi Azure Cosmos DB, wskazując, że usługa Azure Cosmos DB ograniczała żądania zapisu.

Wykres żądań ograniczonych przez usługę Azure Cosmos DB

W odpowiedzi zespół przeskalował usługę Azure Cosmos DB, zwiększając liczbę jednostek RU przydzielonych do kolekcji, ale błędy były kontynuowane. Wydawało się to dziwne, ponieważ ich obliczenia z tyłu koperty wykazały, że usługa Azure Cosmos DB nie powinna mieć problemu z ilością żądań zapisu.

Później tego dnia jeden z deweloperów wysłał do zespołu następujący adres e-mail:

Spojrzałem na usługę Azure Cosmos DB pod kątem ciepłej ścieżki. Jest jedna rzecz, której nie rozumiem. Klucz partycji to deliveryId, ale nie wysyłamy identyfikatora deliveryId do usługi Azure Cosmos DB. Czy brakuje mi czegoś?

To była wskazówka. Patrząc na mapę cieplną partycji, okazało się, że wszystkie dokumenty wylądowały na tej samej partycji.

Wykres mapy cieplnej partycji usługi Azure Cosmos DB

To, co chcesz zobaczyć na mapie cieplnej, jest równomiernym rozkładem we wszystkich partycjach. W takim przypadku, ponieważ każdy dokument został zapisany w tej samej partycji, dodanie jednostek RU nie pomogło. Problem okazał się usterką w kodzie. Mimo że kolekcja usługi Azure Cosmos DB miała klucz partycji, funkcja platformy Azure nie zawierała klucza partycji w dokumencie. Aby uzyskać więcej informacji na temat mapy cieplnej partycji, zobacz Określanie rozkładu przepływności między partycjami.

Test 2. Rozwiązywanie problemu z partycjonowaniem

Po wdrożeniu przez zespół poprawki kodu i ponownym uruchomieniu testu usługa Azure Cosmos DB przestała ograniczać przepływności. Przez pewien czas wszystko wyglądało dobrze. Jednak w pewnym obciążeniu dane telemetryczne wykazały, że funkcja zapisuje mniej dokumentów, które powinny. Poniższy wykres przedstawia komunikaty odbierane z IoT Hub a dokumenty zapisywane w usłudze Azure Cosmos DB. Żółty wiersz to liczba odebranych komunikatów na partię, a zielony to liczba dokumentów zapisanych w partii. Powinny one być proporcjonalne. Zamiast tego liczba operacji zapisu bazy danych na partię znacznie spada o około 07:30.

Wykres porzuconych komunikatów

Następny wykres pokazuje opóźnienie między nadejściem komunikatu IoT Hub z urządzenia, a kiedy aplikacja funkcji przetwarza ten komunikat. Widać, że w tym samym momencie w czasie wzrost opóźnienia dramatycznie, poziom off i spadki.

Wykres opóźnienia komunikatu

Przyczyną szczytowej wartości jest 5 minut, a następnie spadek do zera jest to, że aplikacja funkcji odrzuca komunikaty, które są opóźnione o ponad 5 minut:

foreach (var message in messages)
{
    // Drop stale messages,
    if (message.EnqueuedTimeUtc < cutoffTime)
    {
        log.Info($"Dropping late message batch. Enqueued time = {message.EnqueuedTimeUtc}, Cutoff = {cutoffTime}");
        droppedMessages++;
        continue;
    }
}

Można to zobaczyć na wykresie, gdy metryka opóźnienia spadnie z powrotem do zera. W międzyczasie dane zostały utracone, ponieważ funkcja wyrzucała komunikaty.

Co się stało? W przypadku tego konkretnego testu obciążeniowego kolekcja usługi Azure Cosmos DB miała oszczędzenie jednostek RU, więc wąskie gardło nie znajdowało się w bazie danych. Zamiast tego problem był w pętli przetwarzania komunikatów. Po prostu funkcja nie zapisywała dokumentów wystarczająco szybko, aby nadążyć za przychodzącym woluminem komunikatów. Z biegiem czasu spadł dalej i dalej w tyle.

Test 3. Zapisy równoległe

Jeśli czas przetwarzania komunikatu jest wąskim gardłem, jednym z rozwiązań jest przetwarzanie większej liczby komunikatów równolegle. W tym scenariuszu:

  • Zwiększ liczbę partycji IoT Hub. Każda partycja IoT Hub jest przypisywana do jednego wystąpienia funkcji w danym momencie, więc oczekujemy, że przepływność będzie skalowana liniowo z liczbą partycji.
  • Równoległe zapisywanie dokumentu w funkcji.

Aby zapoznać się z drugą opcją, zespół zmodyfikował funkcję w celu obsługi zapisów równoległych. Oryginalna wersja funkcji użyła powiązania wyjściowego usługi Azure Cosmos DB. Zoptymalizowana wersja wywołuje klienta usługi Azure Cosmos DB bezpośrednio i wykonuje operacje zapisu równolegle przy użyciu funkcji Task.WhenAll:

private async Task<(long documentsUpserted,
                    long droppedMessages,
                    long cosmosDbTotalMilliseconds)>
                ProcessMessagesFromEventHub(
                    int taskCount,
                    int numberOfDocumentsToUpsertPerTask,
                    EventData[] messages,
                    TraceWriter log)
{
    DateTimeOffset cutoffTime = DateTimeOffset.UtcNow.AddMinutes(-5);

    var tasks = new List<Task>();

    for (var i = 0; i < taskCount; i++)
    {
        var docsToUpsert = messages
                            .Skip(i * numberOfDocumentsToUpsertPerTask)
                            .Take(numberOfDocumentsToUpsertPerTask);
        // client will attempt to create connections to the data
        // nodes on Azure Cosmos DB clusters on a range of port numbers
        tasks.Add(UpsertDocuments(i, docsToUpsert, cutoffTime, log));
    }

    await Task.WhenAll(tasks);

    return (this.UpsertedDocuments,
            this.DroppedMessages,
            this.CosmosDbTotalMilliseconds);
}

Należy pamiętać, że warunki wyścigu są możliwe z podejściem. Załóżmy, że dwa komunikaty z tego samego drona docierają do tej samej partii komunikatów. Pisząc je równolegle, wcześniejsza wiadomość może zastąpić późniejszą wiadomość. W tym konkretnym scenariuszu aplikacja może tolerować utratę okazjonalnego komunikatu. Drony wysyłają nowe dane pozycji co 5 sekund, więc dane w usłudze Azure Cosmos DB są stale aktualizowane. Jednak w innych scenariuszach ważne może być przetwarzanie komunikatów ściśle w kolejności.

Po wdrożeniu tej zmiany kodu aplikacja mogła pozyskać ponad 2500 żądań na sekundę przy użyciu IoT Hub z 32 partycjami.

Zagadnienia dotyczące po stronie klienta

Ogólne środowisko klienta może zostać zmniejszone przez agresywną równoległą obsługę po stronie serwera. Rozważ użycie biblioteki funkcji wykonawczej zbiorczej usługi Azure Cosmos DB (nie pokazanej w tej implementacji), co znacznie zmniejsza zasoby obliczeniowe po stronie klienta potrzebne do saturacji przepływności przydzielonej do kontenera usługi Azure Cosmos DB. Pojedyncza wątek aplikacja, która zapisuje dane przy użyciu interfejsu API importu zbiorczego, osiąga prawie dziesięć razy większą przepływność zapisu w porównaniu z wielowątkową aplikacją, która zapisuje dane równolegle podczas saturowania procesora CPU komputera klienckiego.

Podsumowanie

W tym scenariuszu zidentyfikowano następujące wąskie gardła:

  • Partycja zapisu gorąca ze względu na brak wartości klucza partycji w zapisywanych dokumentach.
  • Zapisywanie dokumentów w numerze seryjnym na partycję IoT Hub.

Aby zdiagnozować te problemy, zespół programistyczny oparł się na następujących metrykach:

  • Żądania ograniczone w usłudze Azure Cosmos DB.
  • Mapa cieplna partycji — maksymalna liczba zużytych jednostek RU na partycję.
  • Komunikaty odebrane w porównaniu z utworzonymi dokumentami.
  • Opóźnienie wiadomości.

Następne kroki

Przegląd antywzorzeców wydajności