Migrace z knihovny bulk executoru na hromadnou podporu v sadě .NET SDK služby Azure Cosmos DB v3

PLATÍ PRO: NoSQL

Tento článek popisuje požadované kroky k migraci kódu existující aplikace, která používá knihovnu bulk executor .NET , na funkci hromadné podpory v nejnovější verzi sady .NET SDK.

Povolení hromadné podpory

Povolte hromadnou CosmosClient podporu pro instanci prostřednictvím konfigurace AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Vytvoření úkolů pro každou operaci

Hromadná podpora v sadě .NET SDK funguje tak, že využívá paralelní knihovnu úloh a seskupuje operace, které probíhají souběžně.

V sadě SDK není žádná jediná metoda, která vezme seznam dokumentů nebo operací jako vstupní parametr, ale musíte vytvořit úlohu pro každou operaci, kterou chcete provést hromadně, a pak jednoduše počkat na jejich dokončení.

Pokud je vaším počátečním vstupem například seznam položek, ve kterých má každá položka následující schéma:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Pokud chcete provést hromadný import (podobně jako pomocí BulkExecutor.BulkImportAsync), musíte mít souběžná volání nástroje CreateItemAsync. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Pokud chcete provést hromadnou aktualizaci (podobně jako při použití BulkExecutor.BulkUpdateAsync), musíte mít souběžná volání ReplaceItemAsync metody po aktualizaci hodnoty položky. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

A pokud chcete provést hromadné odstranění (podobně jako pomocí BulkExecutor.BulkDeleteAsync), musíte mít souběžná volání DeleteItemAsync, s klíčem oddílu id a každé položky. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Zaznamenat stav výsledku úkolu

V předchozích příkladech kódu jsme vytvořili souběžný seznam úkolů a volali metodu CaptureOperationResponse pro každý z těchto úkolů. Tato metoda je rozšíření, které nám umožňuje udržovat podobné schéma odpovědi jako BulkExecutor tím, že zachytává všechny chyby a sleduje využití jednotek požadavků.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

OperationResponse Kde je deklarován jako:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Souběžné spouštění operací

Ke sledování rozsahu celého seznamu úkolů používáme tuto pomocnou třídu:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Metoda ExecuteAsync počká na dokončení všech operací a můžete ji použít takto:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Zachytávání statistik

Předchozí kód počká na dokončení všech operací a vypočítá požadované statistiky. Tyto statistiky se podobají statistikám bulkImportResponse knihovny bulk executoru.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Obsahuje BulkOperationResponse :

  1. Celková doba potřebná ke zpracování seznamu operací prostřednictvím hromadné podpory.
  2. Počet úspěšných operací.
  3. Celkový počet spotřebovaných jednotek žádostí.
  4. Pokud dojde k selháním, zobrazí se seznam kolekcí členů, které obsahují výjimku a přidruženou položku pro účely protokolování a identifikace.

Konfigurace opakování

Knihovna bulk executor obsahovala pokyny , které se zmínily o nastavení MaxRetryWaitTimeInSeconds a MaxRetryAttemptsOnThrottledRequestsretryOptions na delegování 0 řízení do knihovny.

Pro hromadnou podporu v sadě .NET SDK neexistuje žádné skryté chování. Možnosti opakování můžete nakonfigurovat přímo prostřednictvím CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests a CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Poznámka

V případech, kdy je počet zřízených jednotek žádostí mnohem nižší, než se očekávalo na základě objemu dat, můžete zvážit jejich nastavení na vysoké hodnoty. Hromadná operace bude trvat déle, ale díky vyšším opakovaným pokusům má vyšší šanci, že bude úspěšná.

Vylepšení výkonu

Stejně jako u jiných operací se sadou .NET SDK, použití rozhraní API streamu vede k lepšímu výkonu a vyhne se zbytečné serializaci.

Použití rozhraní API streamu je možné pouze v případě, že povaha dat, která používáte, odpovídá povaze datového proudu bajtů (například datových proudů souborů). V takových případech se pomocí CreateItemStreamAsyncmetod , ReplaceItemStreamAsyncnebo DeleteItemStreamAsync a práce s ResponseMessage (místo ItemResponse) zvýší propustnost, které je možné dosáhnout.

Další kroky