Use a biblioteca .NET do executor em massa para executar operações em massa no Microsoft Azure Cosmos DB

APLICA-SE A: NoSQL

Observação

A biblioteca do executor em massa descrita neste artigo é mantida para aplicativos que usam a versão 2.x do SDK do .NET. Para aplicativos novos, você pode usar o suporte em massa que está diretamente disponível com a versão 3.x do SDK do .NET e não requer nenhuma biblioteca externa.

Se estiver usando atualmente a biblioteca do executor em massa e planejando migrar para o suporte em massa do SDK mais recente, use as etapas do Guia de migração para migrar seu aplicativo.

Este tutorial fornece instruções sobre como usar a biblioteca do executor em massa do .NET para importar e atualizar documentos no contêiner do Azure Cosmos DB. Para saber mais sobre a biblioteca de executor em massa e como ela ajuda você a usar o armazenamento e a taxa de transferência em massa, confira o artigo Visão geral da biblioteca de executor em massa do Azure Cosmos DB. Neste tutorial, você verá uma amostra do aplicativo do .NET que importa documentos em massa gerados aleatoriamente para um contêiner do Azure Cosmos DB. Após a importação dos dados, a biblioteca irá lhe mostrar como você pode atualizar os dados importados em massa especificando os patches como operações a serem executadas em campos de documento específicos.

Atualmente, a biblioteca do executor em massa tem suporte apenas das contas do Azure Cosmos DB for NoSQL e da API para Gremlin. Este artigo descreve como usar a biblioteca .NET do executor em massa com contas da API para NoSQL. Para saber como usar a biblioteca do executor em massa do .NET com contas de API do Gremlin, confira o artigo Ingerir dados em massa no Azure Cosmos DB for Gremlin usando uma biblioteca de executor em massa.

Pré-requisitos

Clonar o aplicativo de exemplo

Agora, vamos mudar para o trabalho com código baixando um aplicativo de exemplo .NET do GitHub. Esse aplicativo executa operações em massa nos dados armazenados em sua conta do Azure Cosmos DB. Para clonar o aplicativo, abra um prompt de comando, vá para o diretório no qual você deseja copiá-lo e execute o seguinte comando:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

O repositório clonado contém duas amostras: BulkImportSample e BulkUpdateSample. Você pode abrir uma das duas amostras de aplicativo, atualizar as cadeias de conexão no arquivo App.config com as cadeias de conexão da sua conta do Azure Cosmos DB, compilar a solução e executá-la.

O aplicativo BulkImportSample gera documentos aleatórios e os importa em massa para a sua conta do Azure Cosmos DB. O aplicativo BulkUpdateSample atualiza os documentos importados em massa especificando os patches como operações a serem executadas em campos de documento específicos. Nas próximas seções, você examinará o código em cada um desses aplicativos de exemplo.

Importar dados em massa para uma conta do Azure Cosmos DB

  1. Vá para a pasta BulkImportSample e abra o arquivo BulkImportSample.sln.

  2. As cadeias de conexão do Microsoft Azure Cosmos DB são recuperadas do arquivo App.config, conforme mostrado no código a seguir:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    O importador em massa cria um novo banco de dados e um contêiner com o nome do banco de dados, nome do contêiner e os valores de taxa de transferência especificados no arquivo App.config.

  3. A seguir, o objeto DocumentClient é inicializado com o modo de conexão Direct TCP:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. O objeto BulkExecutor é inicializado com um valor de repetição alto para solicitações limitadas e tempo de espera. Em seguida o valor é definido como 0 para que seja aprovado pelo controle de congestionamento do BulkExecutor por toda a sua vida útil.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. O aplicativo invoca a API BulkImportAsync. A biblioteca do .NET fornece duas sobrecargas da API de importação em massa — uma que aceita uma lista de documentos JSON serializados e outra que aceita uma lista de documentos POCO desserializados. Para saber mais sobre as definições de cada um desses métodos de sobrecarga, consulte a documentação da API.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    O método BulkImportAsync aceita os seguintes parâmetros:

    Parâmetro Descrição
    enableUpsert Um sinalizador para habilitar operações upsert nos documentos. Se um documento com a ID determinada já existir, ele será atualizado. Por padrão, é definido para false.
    disableAutomaticIdGeneration Um sinalizador para desabilitar a geração automática de ID. Por padrão, é definido para true.
    maxConcurrencyPerPartitionKeyRange O grau máximo de simultaneidade por intervalo de chave de partição. Defini-lo como nulo faz com que a biblioteca use um valor padrão de 20.
    maxInMemorySortingBatchSize O número máximo de documentos que são retirados do enumerador de documento, que é passado para a chamada à API em cada estágio. Para a fase de classificação na memória que ocorre antes da importação em massa. Definir esse parâmetro como nulo faz com que a biblioteca use o valor mínimo padrão (documents.count, 1000000).
    cancellationToken O token de cancelamento para sair normalmente da operação de importação em massa.

Definição de objeto de resposta de importação em massa
O resultado da chamada à API de importação em massa contém os seguintes atributos:

Parâmetro Descrição
NumberOfDocumentsImported (long) O número total de documentos que foram importados com êxito sem os documentos fornecidos para a chamada à API de importação em massa.
TotalRequestUnitsConsumed (double) O total de unidades de solicitação (RU) consumidas pela chamada de API de importação em massa.
TotalTimeTaken (TimeSpan) O tempo total gasto pela chamada à API de importação em massa para concluir a execução.
BadInputDocuments (List<object>) A lista de documentos com formato inválido que não foram importados com êxito na chamada de API de importação em massa. Corrija os documentos retornados e repita a importação. Os documentos com formato inválido incluem documentos cujo valor de ID não é uma cadeia de caracteres (null ou qualquer outro tipo de dados é considerado inválido).

Atualizar dados em massa na conta do Azure Cosmos DB

Você pode atualizar os documentos existentes usando a API BulkUpdateAsync. Nesse exemplo, você define o campo Name como um valor novo e remove o campo Description dos documentos existentes. Para o conjunto completo de operações suportadas de atualização, consulte a documentação da API.

  1. Vá para a pasta BulkUpdateSample e abra o arquivo BulkUpdateSample.sln.

  2. Defina os itens de atualização juntamente com as operações de atualização de campo correspondentes. Nesse exemplo, você usa SetUpdateOperation para atualizar o campo Name e UnsetUpdateOperation para remover o campo Description de todos os documentos. Você também pode executar outras operações, como incrementar um campo do documento por um valor específico, efetuar push de valores específicos para um campo de matriz ou remover um valor específico de um campo de matriz. Para saber mais sobre os diferentes métodos fornecidos pela API de atualização em massa, consulte a documentação da API.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. O aplicativo invoca a API BulkUpdateAsync. Para saber mais sobre a definição do método BulkUpdateAsync, consulte a documentação da API.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    O método BulkUpdateAsync aceita os seguintes parâmetros:

    Parâmetro Descrição
    maxConcurrencyPerPartitionKeyRange O grau máximo de simultaneidade por intervalo de chave de partição. Definir esse parâmetro como nulo faz com que a biblioteca use o valor padrão (20).
    maxInMemorySortingBatchSize O número máximo de itens de atualização extraídos do enumerador de itens de atualização que é passado para a chamada à API em cada estágio. Para a fase de classificação na memória que ocorre antes de atualização em massa. Definir esse parâmetro como nulo faz com que a biblioteca use o valor mínimo padrão (updateItems.count, 1000000).
    cancellationToken O token de cancelamento para sair normalmente da operação de atualização em massa.

Definição do objeto de resposta de atualização em massa
O resultado da chamada à API de atualização em massa contém os seguintes atributos:

Parâmetro Descrição
NumberOfDocumentsUpdated (long) O número total de documentos que foram atualizados com êxito sem os documentos fornecidos para a chamada à API de atualização em massa.
TotalRequestUnitsConsumed (double) O total de unidades de solicitação (RU) consumidas pela chamada à API de atualização em massa.
TotalTimeTaken (TimeSpan) O tempo total gasto pela chamada à API de atualização em massa para concluir a execução.

Dicas de desempenho

Leve em conta os seguintes pontos para obter um melhor desempenho ao usar a biblioteca do executor em massa:

  • Para obter o melhor desempenho, execute seu aplicativo a partir de uma máquina virtual do Azure que esteja na mesma região da região de gravação da sua conta do Azure Cosmos DB.

  • É recomendado criar uma instância de um único objeto BulkExecutor para o aplicativo inteiro dentro de uma única máquina virtual que corresponda a um contêiner específico do Azure Cosmos DB.

  • Uma única execução de API de operação em massa consome uma grande parte da CPU e de E/S da rede do computador cliente ao gerar várias tarefas internamente. Evite gerar várias tarefas simultâneas em seu processo de aplicativo que executam chamadas à API de operação em massa. Se uma única chamada à API de operação em massa em execução em uma única máquina virtual for incapaz de consumir a taxa de transferência inteira do contêiner (se a taxa de transferência do contêiner for > 1 milhão de RU/s), é preferível criar máquinas virtuais separadas para executar as chamadas à API da operação em massa simultaneamente.

  • Verifique se o método InitializeAsync() é invocado depois de instanciar um objeto BulkExecutor para buscar o mapa de partições de contêiner do Azure Cosmos DB de destino.

  • No App.Config do aplicativo, certifique-se de gcServer esteja habilitado para melhor desempenho

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • A biblioteca emite rastreamentos que podem ser coletados em um arquivo de log ou no console. Para habilitar ambas as opções, adicione o código a seguir ao arquivo App.Config do aplicativo:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Próximas etapas