Migrace z knihovny bulk executoru na hromadnou podporu v sadě .NET SDK služby Azure Cosmos DB v3
PLATÍ PRO: NoSQL
Tento článek popisuje požadované kroky k migraci kódu existující aplikace, která používá knihovnu bulk executor .NET , na funkci hromadné podpory v nejnovější verzi sady .NET SDK.
Povolení hromadné podpory
Povolte hromadnou CosmosClient
podporu pro instanci prostřednictvím konfigurace AllowBulkExecution :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Vytvoření úkolů pro každou operaci
Hromadná podpora v sadě .NET SDK funguje tak, že využívá paralelní knihovnu úloh a seskupuje operace, které probíhají souběžně.
V sadě SDK není žádná jediná metoda, která vezme seznam dokumentů nebo operací jako vstupní parametr, ale musíte vytvořit úlohu pro každou operaci, kterou chcete provést hromadně, a pak jednoduše počkat na jejich dokončení.
Pokud je vaším počátečním vstupem například seznam položek, ve kterých má každá položka následující schéma:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Pokud chcete provést hromadný import (podobně jako pomocí BulkExecutor.BulkImportAsync), musíte mít souběžná volání nástroje CreateItemAsync
. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Pokud chcete provést hromadnou aktualizaci (podobně jako při použití BulkExecutor.BulkUpdateAsync), musíte mít souběžná volání ReplaceItemAsync
metody po aktualizaci hodnoty položky. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
A pokud chcete provést hromadné odstranění (podobně jako pomocí BulkExecutor.BulkDeleteAsync), musíte mít souběžná volání DeleteItemAsync
, s klíčem oddílu id
a každé položky. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Zaznamenat stav výsledku úkolu
V předchozích příkladech kódu jsme vytvořili souběžný seznam úkolů a volali metodu CaptureOperationResponse
pro každý z těchto úkolů. Tato metoda je rozšíření, které nám umožňuje udržovat podobné schéma odpovědi jako BulkExecutor tím, že zachytává všechny chyby a sleduje využití jednotek požadavků.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
OperationResponse
Kde je deklarován jako:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Souběžné spouštění operací
Ke sledování rozsahu celého seznamu úkolů používáme tuto pomocnou třídu:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Metoda ExecuteAsync
počká na dokončení všech operací a můžete ji použít takto:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Zachytávání statistik
Předchozí kód počká na dokončení všech operací a vypočítá požadované statistiky. Tyto statistiky se podobají statistikám bulkImportResponse knihovny bulk executoru.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
Obsahuje BulkOperationResponse
:
- Celková doba potřebná ke zpracování seznamu operací prostřednictvím hromadné podpory.
- Počet úspěšných operací.
- Celkový počet spotřebovaných jednotek žádostí.
- Pokud dojde k selháním, zobrazí se seznam kolekcí členů, které obsahují výjimku a přidruženou položku pro účely protokolování a identifikace.
Konfigurace opakování
Knihovna bulk executor obsahovala pokyny , které se zmínily o nastavení MaxRetryWaitTimeInSeconds
a MaxRetryAttemptsOnThrottledRequests
retryOptions na delegování 0
řízení do knihovny.
Pro hromadnou podporu v sadě .NET SDK neexistuje žádné skryté chování. Možnosti opakování můžete nakonfigurovat přímo prostřednictvím CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests a CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Poznámka
V případech, kdy je počet zřízených jednotek žádostí mnohem nižší, než se očekávalo na základě objemu dat, můžete zvážit jejich nastavení na vysoké hodnoty. Hromadná operace bude trvat déle, ale díky vyšším opakovaným pokusům má vyšší šanci, že bude úspěšná.
Vylepšení výkonu
Stejně jako u jiných operací se sadou .NET SDK, použití rozhraní API streamu vede k lepšímu výkonu a vyhne se zbytečné serializaci.
Použití rozhraní API streamu je možné pouze v případě, že povaha dat, která používáte, odpovídá povaze datového proudu bajtů (například datových proudů souborů). V takových případech se pomocí CreateItemStreamAsync
metod , ReplaceItemStreamAsync
nebo DeleteItemStreamAsync
a práce s ResponseMessage
(místo ItemResponse
) zvýší propustnost, které je možné dosáhnout.
Další kroky
- Další informace o vydaných verzích sady .NET SDK najdete v článku o sadě Sdk služby Azure Cosmos DB .
- Získejte kompletní zdrojový kód migrace z GitHubu.
- Další hromadné ukázky na GitHubu
- Pokoušíte se naplánovat kapacitu pro migraci do služby Azure Cosmos DB?
- Pokud víte jen o počtu virtuálních jader a serverů v existujícím databázovém clusteru, přečtěte si o odhadu jednotek žádostí pomocí virtuálních jader nebo virtuálních procesorů.
- Pokud znáte typické frekvence požadavků pro aktuální úlohy databáze, přečtěte si o odhadu jednotek žádostí pomocí Plánovače kapacity služby Azure Cosmos DB.