Migrasi dari pustaka eksekutor massal ke dukungan massal di Azure Cosmos DB .NET V3 SDK

BERLAKU UNTUK: NoSQL

Artikel ini menjelaskan langkah-langkah yang diperlukan untuk memigrasikan kode aplikasi yang ada yang menggunakan pustaka eksekutor massal .NET ke fitur dukungan massal dalam versi terbaru .NET SDK.

Mengaktifkan dukungan massal

Aktifkan dukungan massal pada instans CosmosClient melalui konfigurasi AllowBulkExecution:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Membuat Tugas untuk setiap operasi

Dukungan massal dalam .NET SDK berfungsi dengan memanfaatkan Pustaka Paralel Tugas dan operasi pengelompokan yang terjadi secara bersamaan.

Tidak ada satu metode pun dalam SDK yang akan mengambil daftar dokumen atau operasi Anda sebagai parameter input, melainkan, Anda perlu membuat Tugas untuk setiap operasi yang ingin Anda jalankan secara massal, lalu cukup tunggu hingga selesai.

Misalnya, jika input awal Anda adalah daftar item di mana setiap item memiliki skema berikut:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Jika Anda ingin melakukan impor massal (mirip dengan penggunaan BulkExecutor.BulkImportAsync), Anda harus melakukan panggilan bersamaan ke CreateItemAsync. Contohnya:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Jika Anda ingin melakukan pembaruan massal (mirip dengan penggunaan BulkExecutor.BulkUpdateAsync), Anda harus melakukan panggilan bersamaan ke metode ReplaceItemAsync setelah memperbarui nilai item. Contohnya:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Lalu jika Anda ingin melakukan penghapusan massal (mirip dengan penggunaan BulkExecutor.BulkDeleteAsync), Anda harus melakukan panggilan bersamaan ke DeleteItemAsync, dengan id dan partisi setiap item. Contohnya:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Menangkap status hasil tugas

Dalam contoh kode sebelumnya, kami telah membuat daftar tugas bersamaan, dan memanggil metode CaptureOperationResponse pada setiap tugas tersebut. Metode ini adalah ekstensi yang memungkinkan kita mempertahankan skema respons yang mirip BulkExecutor, dengan menangkap kesalahan dan melacak penggunaan unit permintaan.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Di mana OperationResponse dinyatakan sebagai:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Menjalankan operasi secara bersamaan

Untuk melacak cakupan seluruh daftar Tugas, kami menggunakan kelas pembantu ini:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Metode ExecuteAsync ini akan menunggu hingga semua operasi selesai dan Anda dapat menggunakannya seperti:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Menangkap statistik

Kode sebelumnya menunggu hingga semua operasi selesai dan menghitung statistik yang diperlukan. Statistik ini mirip dengan yang ada pada BulkImportResponse pustaka eksekutor massal.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse berisi:

  1. Total waktu yang diambil untuk memproses daftar operasi melalui dukungan massal.
  2. Jumlah operasi yang berhasil.
  3. Total unit permintaan yang digunakan.
  4. Jika terjadi kegagalan, ini akan menampilkan daftar tuple yang berisi pengecualian dan item terkait untuk tujuan pencatatan dan identifikasi.

Mencoba ulang konfigurasi

Pustaka eksekutor massal memiliki panduan yang menyebutkan untuk mengatur MaxRetryWaitTimeInSecondsdan MaxRetryAttemptsOnThrottledRequests pada RetryOptions ke 0 untuk mendelegasikan kontrol ke pustaka.

Untuk dukungan massal di .NET SDK, tidak ada perilaku tersembunyi. Anda dapat mengonfigurasi opsi coba lagi secara langsung melalui CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests dan CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Catatan

Apabila unit permintaan yang disediakan jauh lebih rendah dari yang diharapkan berdasarkan jumlah data, Anda mungkin ingin mempertimbangkan pengaturan ini ke nilai tinggi. Operasi massal akan memakan waktu lebih lama tetapi memiliki peluang lebih tinggi untuk sepenuhnya berhasil karena percobaan ulang yang lebih tinggi.

Peningkatan performa

Seperti halnya operasi lain dengan .NET SDK, penggunaan API stream menghasilkan performa yang lebih baik dan menghindari serialisasi yang tidak perlu.

Penggunaan API stream hanya dimungkinkan jika sifat data yang Anda gunakan cocok dengan aliran byte (misalnya, aliran file). Dalam kasus tersebut, penggunaan metode CreateItemStreamAsync, ReplaceItemStreamAsync, atau DeleteItemStreamAsync dan bekerja dengan ResponseMessage (bukannya ItemResponse) akan meningkatkan throughput yang dapat dicapai.

Langkah berikutnya