Migrar da biblioteca de executores em massa para o suporte em massa do SDK do Azure Cosmos DB .NET v3

APLICA-SE A: NoSQL

Este artigo descreve as etapas necessárias para migrar o código de um aplicativo existente que usa a biblioteca de executor em massa do .NET para o recurso de suporte em massa na versão mais recente do SDK do .NET.

Habilitar suporte em massa

Habilita o suporte em massa CosmosClient na instância por meio da configuração AllowBulkExecution:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Criar tarefas para cada operação

O suporte em massa no SDK do .NET funciona utilizando a Biblioteca Paralela de Tarefas e as operações de agrupamento que ocorrem simultaneamente.

Não há nenhum método único no SDK que levará sua lista de documentos ou operações como um parâmetro de entrada, mas, em vez disso, você precisa criar uma Tarefa para cada operação que deseja executar em massa e, em seguida, simplesmente aguardar a conclusão.

Por exemplo, se a entrada inicial for uma lista de itens em que cada item tem o seguinte esquema:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Se você quiser fazer a importação em massa (semelhante ao uso de BulkExecutor.BulkImportAsync), precisará ter chamadas simultâneas para CreateItemAsync. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Se você quiser fazer a atualização em massa (semelhante ao uso de BulkExecutor.BulkUpdateAsync), precisará ter chamadas simultâneas para o método ReplaceItemAsync depois de atualizar o valor do item. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

E se você quiser fazer a exclusão em massa (semelhante ao uso de BulkExecutor.BulkDeleteAsync), precisará ter chamadas simultâneas para DeleteItemAsync, com a chave de partição e de cada id item. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Estado do resultado da tarefa de captura

Nos exemplos de código anteriores, criamos uma lista simultânea de tarefas e chamamos o método CaptureOperationResponse em cada uma dessas tarefas. Esse método é uma extensão que nos permite manter um esquema de resposta semelhante ao BulkExecutor, capturando erros e acompanhando o uso das unidades de solicitação.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Em que OperationResponse é declarado como:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Executar operações simultaneamente

Para acompanhar o escopo de toda a lista de tarefas, usamos essa classe auxiliar:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

O método ExecuteAsync aguardará até que todas as operações sejam concluídas e você poderá usá-las da seguinte forma:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Estatísticas de captura

O código anterior aguarda até que todas as operações sejam concluídas e calcula as estatísticas necessárias. Essas estatísticas são semelhantes às do BulkImportResponseda biblioteca de executor em massa.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

O BulkOperationResponse contém:

  1. O tempo total necessário para processar a lista de operações por meio do suporte em massa.
  2. O número de operações bem-sucedidas.
  3. O total de unidades de solicitação consumidas.
  4. Se houver falhas, ele exibirá uma lista de tuplas que contêm a exceção e o item associado para fins de log e de identificação.

Repetir configuração

A biblioteca de executores em massa tinha diretrizes que mencionaram para definir o MaxRetryWaitTimeInSeconds e o MaxRetryAttemptsOnThrottledRequests de retryoptions para 0 delegar controle à biblioteca.

Para suporte em massa no SDK do .NET, não há nenhum comportamento oculto. Você pode configurar as opções de repetição diretamente por meio de CosmosClientOptions. MaxRetryAttemptsOnRateLimitedRequests e CosmosClientOptions. MaxRetryWaitTimeOnRateLimitedRequests.

Observação

Nos casos em que as unidades de solicitação provisionadas são muito menores do que o esperado com base na quantidade de dados, talvez você queira considerar defini-los como valores altos. A operação em massa levará mais tempo, mas terá uma chance maior de ser concluída com sucesso devido às mais altas tentativas.

Aprimoramentos de desempenho

Assim como acontece com outras operações com o SDK do .NET, o uso das APIs de fluxo resulta em um melhor desempenho e evita qualquer serialização desnecessária.

O uso de APIs de fluxo só é possível se a natureza dos dados que você usa corresponder à de um fluxo de bytes (por exemplo, fluxos de arquivo). Nesses casos, usar os métodos CreateItemStreamAsync, ReplaceItemStreamAsync , ou DeleteItemStreamAsync e trabalhar com ResponseMessage (em vez de ItemResponse ) aumenta a taxa de transferência que pode ser obtida.

Próximas etapas