Migrowanie z biblioteki funkcji wykonawczej zbiorczej do obsługi zbiorczej w zestawie SDK platformy .NET w wersji 3 usługi Azure Cosmos DB

DOTYCZY: NoSQL

W tym artykule opisano wymagane kroki migracji kodu istniejącej aplikacji, który używa biblioteki funkcji wykonawczej zbiorczej platformy .NET do funkcji obsługi zbiorczej w najnowszej wersji zestawu .NET SDK.

Włączanie obsługi zbiorczej

Włącz obsługę zbiorczą wystąpienia CosmosClient za pomocą konfiguracji AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Tworzenie zadań dla każdej operacji

Obsługa zbiorcza w zestawie SDK platformy .NET działa przy użyciu biblioteki równoległej zadań i operacji grupowania wykonywanych współbieżnie.

W zestawie SDK nie ma jednej metody, która będzie pobierać listę dokumentów lub operacji jako parametr wejściowy, ale raczej należy utworzyć zadanie dla każdej operacji, którą chcesz wykonać zbiorczo, a następnie po prostu poczekać na ukończenie.

Jeśli na przykład początkowe dane wejściowe są listą elementów, w których każdy element ma następujący schemat:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Jeśli chcesz przeprowadzić importowanie zbiorcze (podobnie jak w przypadku funkcji BulkExecutor.BulkImportAsync), musisz mieć współbieżne wywołania do CreateItemAsyncprogramu . Przykład:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Jeśli chcesz przeprowadzić aktualizację zbiorczą (podobnie jak w przypadku funkcji BulkExecutor.BulkUpdateAsync), musisz mieć współbieżne wywołania metody ReplaceItemAsync po zaktualizowaniu wartości elementu. Przykład:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Jeśli chcesz usunąć zbiorczo (podobnie jak w przypadku funkcji BulkExecutor.BulkDeleteAsync), musisz mieć współbieżne wywołania do DeleteItemAsyncelementu z kluczem id partycji i dla każdego elementu. Przykład:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Przechwyć stan wyniku zadania

W poprzednich przykładach kodu utworzyliśmy współbieżną listę zadań i wywołaliśmy metodę CaptureOperationResponse dla każdego z tych zadań. Ta metoda to rozszerzenie, które pozwala nam zachować podobny schemat odpowiedzi jako BulkExecutor, przechwytując wszelkie błędy i śledząc użycie jednostek żądań.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Gdzie element OperationResponse jest zadeklarowany jako:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Wykonywanie operacji współbieżnie

Aby śledzić zakres całej listy zadań, użyjemy tej klasy pomocniczej:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Metoda ExecuteAsync będzie czekać na ukończenie wszystkich operacji i można jej użyć w następujący sposób:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Przechwytywanie statystyk

Poprzedni kod czeka, aż wszystkie operacje zostaną ukończone i obliczy wymagane statystyki. Te statystyki są podobne do tych biblioteki funkcji wykonawczej zbiorczej BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Zawiera BulkOperationResponse :

  1. Łączny czas potrzebny na przetworzenie listy operacji za pośrednictwem obsługi zbiorczej.
  2. Liczba zakończonych powodzeniem operacji.
  3. Łączna liczba wykorzystanych jednostek żądania.
  4. Jeśli wystąpią błędy, zostanie wyświetlona lista krotki, które zawierają wyjątek i skojarzony element na potrzeby rejestrowania i identyfikacji.

Ponów próbę konfiguracji

Biblioteka funkcji wykonawczej zbiorczej zawierała wskazówki , które zostały wymienione w celu ustawienia MaxRetryWaitTimeInSeconds parametru i MaxRetryAttemptsOnThrottledRequestsfunkcji RetryOptions w celu 0 delegowania kontroli do biblioteki.

W przypadku zbiorczej obsługi zestawu .NET SDK nie ma ukrytego zachowania. Opcje ponawiania można skonfigurować bezpośrednio za pomocą poleceń CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests i CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Uwaga

W przypadkach, gdy aprowizowane jednostki żądania są znacznie niższe niż oczekiwano na podstawie ilości danych, warto rozważyć ustawienie tych jednostek na wysokie wartości. Operacja zbiorcza potrwa dłużej, ale ma większe szanse na całkowite powodzenie z powodu wyższych ponownych prób.

Usprawnienia wydajności

Podobnie jak w przypadku innych operacji z zestawem SDK platformy .NET, użycie interfejsów API strumienia zapewnia lepszą wydajność i pozwala uniknąć niepotrzebnej serializacji.

Korzystanie z interfejsów API strumienia jest możliwe tylko wtedy, gdy charakter używanych danych jest zgodny ze strumieniem bajtów (na przykład strumieniami plików). W takich przypadkach użycie CreateItemStreamAsyncmetod , ReplaceItemStreamAsynclub DeleteItemStreamAsync i praca z ResponseMessage (zamiast ItemResponse) zwiększa przepływność, którą można osiągnąć.

Następne kroki