Bulk Executor library からAzure Cosmos DB .NET V3 SDK のBulkサポートに移行する

適用対象: NoSQL

この記事は、.NET Bulk Executorライブラリを使用する既存のアプリケーションのコードを、最新バージョンの .NET SDK の一括サポート機能に移行するために必要な手順について説明します。

Bulkサポートを有効にする

CosmosClientインスタンスで、AllowBulkExecution 構成を使用し、Bulkサポートを有効にする:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

各操作のタスクを作成する

.NET SDKにおけるBulkサポートは、 タスク並列ライブラリ、および同時に発動するグループ化操作を活用することによって作動する。

SDK にはドキュメントまたは操作のリストを入力パラメーターとして受け取る単一のメソッドがありませんが、一括で実行する操作ごとにタスクを作成する必要があります。その後、それらが完了するのを待ちます。

例えば最初の入力が、各項目が以下のスキーマを取るアイテムのリストであるとします:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

一括インポートを実行 (BulkExecutor.BulkImportAsync の使用と同様) する場合は、CreateItemAsync の同時呼び出しが必要です。 次に例を示します。

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

一括アップデートBulkExecutor.BulkImportAsyncの使用と同様)を実行したい場合は、項目の値を更新してからReplaceItemAsyncメソッドを同時に起動する必要があります。 次に例を示します。

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

一括削除BulkExecutor.BulkDeleteAsyncの使用と同様)を実行したい場合は、DeleteItemAsyncidおよび各項目のパーテーションキーと同時に起動する必要があります。 次に例を示します。

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

タスクの結果の状態をCaptureします

以前のコード例では、タスクの同時実行リストを作成し、各タスクで CaptureOperationResponse メソッドを呼び出しました。 このメソッドは、エラーや リクエストユニットの使用の追跡を行うことによって、BulkExecutorとしての要求した応答スキーマを維持することを可能にする拡張機能です。

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

OperationResponseが以下のように示されているところは:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

同時に操作を実行する

タスクのリスト全体のスコープを追跡するには、このヘルパー クラスを使用します。

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

ExecuteAsync メソッドはすべての操作が完了するまで待ちます。ユーザーは次のようにこのメソッドを使用できます。

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

統計をCaptureする

前のコードは、全ての操作が完了するまで待機し、それから必要な統計の計算を行います。 これらの統計は、一括実行プログラムのBulkImportResponseのものと類似しています。

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponseには以下のものが含まれます:

  1. 一括サポートを通して操作の一覧を処理するのにかかった時間の合計。
  2. 正常に処理された操作の数。
  3. 消費された要求ユニットの合計。
  4. エラーが生じた場合は、例外を含む組み合わせの一覧およびロギングと識別目的のための関連項目が表示されます。

コンフィグレーションを再試行する

一括実行ライブラリには、MaxRetryWaitTimeInSecondsおよび RetryOptionsMaxRetryAttemptsOnThrottledRequestsを、ライブラリにコントロールを委任するために0に設定するガイダンスがありました。

.NET SDKの一括サポートに関しては、非表示の動作はありません。 再試行オプションをCosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests および CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequestsを通して直接コンフィギュアする事が出来ます。

Note

データの量に基づき、供給された要求ユニットが予想よりはるかに低いとされた場合には、これらの値を高い値に設定することを検討してください。 一括操作は長時間かかりますが、再試行の回数が多いため、より高い可能性で完全に成功します。

パフォーマンスの向上

.NET SDKの他の操作と同様に、stream APIsを使用することでよりパフォーマンスが向上させ、不要なシリアル化を回避することにつながります。

stream APIsの使用は、ご使用のデータの性質がbytesのストリームのものと合致する場合においてのみ可能です(例:ファイルストリーム)。 そのような場合には、CreateItemStreamAsyncまたはReplaceItemStreamAsyncDeleteItemStreamAsyncメソッドを使用し、ResponseMessage (ItemResponseではなく)を操作することで、達成出来るスループットが向上します。

次のステップ