Utiliser la bibliothèque d’exécuteur en bloc .NET pour effectuer des opérations en bloc dans Azure Cosmos DBUse the bulk executor .NET library to perform bulk operations in Azure Cosmos DB

Ce tutoriel fournit des instructions sur l’utilisation de la bibliothèque de l’exécuteur en bloc .NET pour importer et mettre à jour des documents vers un conteneur Azure Cosmos.This tutorial provides instructions on using the bulk executor .NET library to import and update documents to an Azure Cosmos container. Pour en savoir plus sur la bibliothèque de l’exécuteur en bloc et sur la façon dont elle vous aide à profiter d’un débit et d’un stockage conséquents, consultez l’article Vue d’ensemble de la bibliothèque de l’exécuteur en bloc.To learn about the bulk executor library and how it helps you leverage massive throughput and storage, see the bulk executor library overview article. Dans ce tutoriel, vous voyez un exemple d’application .NET qui importe en bloc des documents, générés de manière aléatoire, dans un conteneur Azure Cosmos.In this tutorial, you will see a sample .NET application that bulk imports randomly generated documents into an Azure Cosmos container. Après l’importation, il illustre comment mettre à jour en bloc les données importées en spécifiant des correctifs comme opérations à effectuer sur des champs de documents spécifiques.After importing, it shows you how you can bulk update the imported data by specifying patches as operations to perform on specific document fields.

Actuellement, la bibliothèque de l’exécuteur en bloc est prise en charge uniquement par les comptes d’API Gremlin et d’API SQL Azure Cosmos DB.Currently, bulk executor library is supported by the Azure Cosmos DB SQL API and Gremlin API accounts only. Cet article décrit comment utiliser la bibliothèque .NET de l’exécuteur en bloc avec des comptes d’API SQL.This article describes how to use the bulk executor .NET library with SQL API accounts. Pour en savoir plus sur l’utilisation de la bibliothèque .NET de l’exécuteur en bloc avec les comptes d’API Gremlin, consultez Effectuer des opérations en bloc dans l’API Gremlin Azure Cosmos DB.To learn about using the bulk executor .NET library with Gremlin API accounts, see perform bulk operations in the Azure Cosmos DB Gremlin API.

PrérequisPrerequisites

Clonage de l’exemple d’applicationClone the sample application

Nous allons maintenant passer à l’utilisation de code en téléchargeant un exemple d’application .NET à partir de GitHub.Now let's switch to working with code by downloading a sample .NET application from GitHub. Cette application effectue des opérations en bloc sur les données stockées dans votre compte Azure Cosmos.This application performs bulk operations on the data stored in your Azure Cosmos account. Pour cloner l’application, ouvrez une invite de commandes, accédez au répertoire où vous souhaitez la copier, puis exécutez la commande suivante :To clone the application, open a command prompt, navigate to the directory where you want to copy it and run the following command:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Le dépôt cloné contient deux exemples : « BulkImportSample » et « BulkUpdateSample ».The cloned repository contains two samples "BulkImportSample" and "BulkUpdateSample". Vous pouvez ouvrir l’un ou l’autre exemple d’application, mettre à jour les chaînes de connexion dans le fichier App.config avec les chaînes de connexion de votre compte Azure Cosmos DB, générer la solution et l’exécuter.You can open either of the sample applications, update the connection strings in App.config file with your Azure Cosmos DB account’s connection strings, build the solution, and run it.

L’application « BulkImportSample » génère des documents aléatoires et les importe en bloc dans votre compte Azure Cosmos.The "BulkImportSample" application generates random documents and bulk imports them to your Azure Cosmos account. L’application « BulkUpdateSample » met à jour en bloc les documents importés en spécifiant des correctifs comme opérations à effectuer sur des champs de documents spécifiques.The "BulkUpdateSample" application bulk updates the imported documents by specifying patches as operations to perform on specific document fields. Dans les sections suivantes, nous allons examiner le code de chacun de ces exemples d’applications.In the next sections, you will review the code in each of these sample apps.

Importer des données en bloc dans un compte Azure CosmosBulk import data to an Azure Cosmos account

  1. Accédez au dossier « BulkImportSample » et ouvrez le fichier « BulkImportSample.sln ».Navigate to the "BulkImportSample" folder and open the "BulkImportSample.sln" file.

  2. Les chaînes de connexion d’Azure Cosmos DB sont récupérées à partir du fichier App.config, comme indiqué dans le code suivant :The Azure Cosmos DB’s connection strings are retrieved from the App.config file as shown in the following code:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    L’importateur en bloc crée une base de données et un conteneur avec le nom de la base de données, le nom du conteneur et les valeurs de débit spécifiées dans le fichier App.config.The bulk importer creates a new database and a container with the database name, container name, and the throughput values specified in the App.config file.

  3. Ensuite, l’objet DocumentClient est initialisé avec le mode de connexion TCP direct :Next the DocumentClient object is initialized with Direct TCP connection mode:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. L’objet BulkExecutor est initialisé avec une valeur de nouvelle tentative élevée pour la durée d’attente et les requêtes limitées.The BulkExecutor object is initialized with a high retry value for wait time and throttled requests. Ensuite, elles sont définies sur 0 pour passer le contrôle de congestion à BulkExecutor pendant sa durée de vie.And then they are set to 0 to pass congestion control to BulkExecutor for its lifetime.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. L’application appelle l’API BulkImportAsync.The application invokes the BulkImportAsync API. La bibliothèque .NET fournit deux surcharges de l’API d’importation en bloc : une qui accepte une liste de documents JSON sérialisés et une autre qui accepte une liste de documents POCO désérialisés.The .NET library provides two overloads of the bulk import API - one that accepts a list of serialized JSON documents and the other that accepts a list of deserialized POCO documents. Pour en savoir plus sur les définitions de chacune de ces méthodes surchargées, consultez la documentation de l’API.To learn more about the definitions of each of these overloaded methods, refer to the API documentation.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    La méthode BulkImportAsync accepte les paramètres suivants :BulkImportAsync method accepts the following parameters:

    ParamètreParameter DescriptionDescription
    enableUpsertenableUpsert Indicateur permettant d’activer les opérations upsert sur les documents.A flag to enable upsert operations on the documents. Si un document avec l’ID donné existe déjà, il est mis à jour.If a document with the given ID already exists, it's updated. Par défaut, cet indicateur a la valeur false.By default, it is set to false.
    disableAutomaticIdGenerationdisableAutomaticIdGeneration Indicateur permettant de désactiver la génération automatique d’ID.A flag to disable automatic generation of ID. Par défaut, il est défini sur true.By default, it is set to true.
    maxConcurrencyPerPartitionKeyRangemaxConcurrencyPerPartitionKeyRange Degré maximal de concurrence par plage de clés de partition. Si vous lui affectez la valeur null, la bibliothèque utilise la valeur par défaut 20.The maximum degree of concurrency per partition key range, setting to null will cause library to use a default value of 20.
    maxInMemorySortingBatchSizemaxInMemorySortingBatchSize Nombre maximal de documents qui sont extraits à partir de l’énumérateur de documents, qui est passé à l’appel d’API à chaque étape.The maximum number of documents that are pulled from the document enumerator, which is passed to the API call in each stage. Pour la phase de tri en mémoire qui se produit avant l’importation en bloc, si vous affectez à ce paramètre la valeur null, la bibliothèque utilise la valeur minimale par défaut (documents.count, 1000000).For in-memory sorting phase that happens before bulk importing, setting this parameter to null will cause library to use default minimum value (documents.count, 1000000).
    cancellationTokencancellationToken Jeton d’annulation permettant de quitter normalement l’opération d’importation en bloc.The cancellation token to gracefully exit the bulk import operation.

    Définition de l’objet de réponse d’importation en bloc Le résultat de l’appel d’API d’importation en bloc contient les attributs suivants :Bulk import response object definition The result of the bulk import API call contains the following attributes:

    ParamètreParameter DescriptionDescription
    NumberOfDocumentsImported (long)NumberOfDocumentsImported (long) Nombre total de documents qui ont été importés avec succès, sur tous les documents fournis à l’appel d’API d’importation en bloc.The total number of documents that were successfully imported out of the total documents supplied to the bulk import API call.
    TotalRequestUnitsConsumed (double)TotalRequestUnitsConsumed (double) Nombre total d’unités de requête (RU) consommées par l’appel d’API d’importation en bloc.The total request units (RU) consumed by the bulk import API call.
    TotalTimeTaken (TimeSpan)TotalTimeTaken (TimeSpan) Temps total nécessaire à l’exécution de l’appel d’API d’importation en bloc.The total time taken by the bulk import API call to complete the execution.
    BadInputDocuments (List<object>)BadInputDocuments (List<object>) Liste de documents au format incorrect qui n’ont pas été importés dans l’appel d’API d’importation en bloc.The list of bad-format documents that were not successfully imported in the bulk import API call. Corrigez les documents retournés et retentez l’importation.Fix the documents returned and retry import. Les documents au format incorrect incluent les documents dont la valeur d’ID n’est pas une chaîne (null ou tout autre type de données est considéré comme non valide).Bad-formatted documents include documents whose ID value is not a string (null or any other datatype is considered invalid).

Mettre à jour des données en bloc dans votre compte Azure CosmosBulk update data in your Azure Cosmos account

Vous pouvez mettre à jour des documents existants à l’aide de l’API BulkUpdateAsync.You can update existing documents by using the BulkUpdateAsync API. Dans cet exemple, vous allez affecter une nouvelle valeur au champ Name et supprimer le champ Description des documents existants.In this example, you will set the Name field to a new value and remove the Description field from the existing documents. Pour connaître l’ensemble complet d’opérations de mise à jour prises en charge, consultez la documentation de l’API.For the full set of supported update operations, refer to the API documentation.

  1. Accédez au dossier « BulkUpdateSample » et ouvrez le fichier « BulkUpdateSample.sln ».Navigate to the "BulkUpdateSample" folder and open the "BulkUpdateSample.sln" file.

  2. Définissez les éléments de mise à jour ainsi que les opérations de mise à jour de champs correspondantes.Define the update items along with the corresponding field update operations. Dans cet exemple, vous allez utiliser SetUpdateOperation pour mettre à jour le champ Name et UnsetUpdateOperation pour supprimer le champ Description de tous les documents.In this example, you will use SetUpdateOperation to update the Name field and UnsetUpdateOperation to remove the Description field from all the documents. Vous pouvez également effectuer d’autres opérations comme incrémenter un champ de document d’une valeur spécifique, envoyer des valeurs spécifiques dans un champ de tableau ou supprimer une valeur spécifique d’un champ de tableau.You can also perform other operations like increment a document field by a specific value, push specific values into an array field, or remove a specific value from an array field. Pour en savoir plus sur les différentes méthodes fournies par l’API de mise à jour en bloc, consultez la documentation de l’API.To learn about different methods provided by the bulk update API, refer to the API documentation.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. L’application appelle l’API BulkUpdateAsync.The application invokes the BulkUpdateAsync API. Pour en savoir plus sur la définition de la méthode BulkUpdateAsync, consultez la documentation de l’API.To learn about the definition of the BulkUpdateAsync method, refer to the API documentation.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    La méthode BulkUpdateAsync accepte les paramètres suivants :BulkUpdateAsync method accepts the following parameters:

    ParamètreParameter DescriptionDescription
    maxConcurrencyPerPartitionKeyRangemaxConcurrencyPerPartitionKeyRange Degré maximal de concurrence par plage de clés de partition. Si vous affectez à ce paramètre la valeur null, la bibliothèque utilise la valeur par défaut (20).The maximum degree of concurrency per partition key range, setting this parameter to null will make the library to use the default value(20).
    maxInMemorySortingBatchSizemaxInMemorySortingBatchSize Nombre maximal d’éléments de mise à jour tirés (pull) de l’énumérateur d’éléments de mise à jour transmis à l’appel d’API à chaque étape.The maximum number of update items pulled from the update items enumerator passed to the API call in each stage. Pour la phase de tri en mémoire qui se produit avant la mise à jour en bloc, si vous affectez à ce paramètre la valeur null, la bibliothèque utilise la valeur minimale par défaut (updateItems.count, 1000000).For the in-memory sorting phase that happens before bulk updating, setting this parameter to null will cause the library to use the default minimum value(updateItems.count, 1000000).
    cancellationTokencancellationToken Jeton d’annulation permettant de quitter normalement l’opération de mise à jour en bloc.The cancellation token to gracefully exit the bulk update operation.

    Définition de l’objet de réponse de mise à jour en bloc Le résultat de l’appel d’API de mise à jour en bloc contient les attributs suivants :Bulk update response object definition The result of the bulk update API call contains the following attributes:

    ParamètreParameter DescriptionDescription
    NumberOfDocumentsUpdated (long)NumberOfDocumentsUpdated (long) Nombre de documents qui ont été mis à jour avec succès, sur tous les documents fournis à l’appel d’API de mise à jour en bloc.The number of documents that were successfully updated out of the total documents supplied to the bulk update API call.
    TotalRequestUnitsConsumed (double)TotalRequestUnitsConsumed (double) Nombre total d’unités de requête (RU) consommées par l’appel d’API de mise à jour en bloc.The total request units (RUs) consumed by the bulk update API call.
    TotalTimeTaken (TimeSpan)TotalTimeTaken (TimeSpan) Temps total nécessaire à l’exécution de l’appel d’API de mise à jour en bloc.The total time taken by the bulk update API call to complete the execution.

Conseils sur les performancesPerformance tips

Pour bénéficier de meilleures performances lors de l’utilisation de la bibliothèque de l’exécuteur en bloc, considérez les points suivants :Consider the following points for better performance when using the bulk executor library:

  • Pour des performances optimales, exécutez votre application à partir d’une machine virtuelle Azure qui se trouve dans la région d’écriture de votre compte Azure Cosmos.For best performance, run your application from an Azure virtual machine that is in the same region as your Azure Cosmos account's write region.

  • Nous vous recommandons d’instancier un objet BulkExecutor unique pour l’ensemble de l’application au sein d’une seule machine virtuelle qui correspond à un conteneur Azure Cosmos spécifique.It is recommended that you instantiate a single BulkExecutor object for the whole application within a single virtual machine that corresponds to a specific Azure Cosmos container.

  • L’exécution d’une API d’opération en bloc consomme une grande partie des E/S réseau et du processeur de l’ordinateur client (en raison de la génération de plusieurs tâches en interne).Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO (This happens by spawning multiple tasks internally). Évitez de générer au sein du processus de votre application plusieurs tâches simultanées qui exécutent des appels d’API d’opération en bloc.Avoid spawning multiple concurrent tasks within your application process that execute bulk operation API calls. Si un appel d’API d’opération en bloc en cours d’exécution sur une seule machine virtuelle ne peut pas consommer le débit complet du conteneur (si le débit de votre conteneur est supérieur à 1 million RU/s), il est préférable de créer des machines virtuelles distinctes pour exécuter simultanément les appels d’API d’opérations en bloc.If a single bulk operation API call that is running on a single virtual machine is unable to consume the entire container's throughput (if your container's throughput > 1 million RU/s), it's preferred to create separate virtual machines to concurrently execute the bulk operation API calls.

  • Vérifiez que la méthode InitializeAsync() est appelée après l’instanciation d’un objet BulkExecutor pour extraire le mappage de partition du conteneur Cosmos cible.Ensure the InitializeAsync() method is invoked after instantiating a BulkExecutor object to fetch the target Cosmos container's partition map.

  • Pour obtenir de meilleures performances, vérifiez que gcServer est activé dans le fichier App.Config de votre application.In your application's App.Config, ensure gcServer is enabled for better performance

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • La bibliothèque émet des traces qui peuvent être collectées dans un fichier journal ou sur la console.The library emits traces that can be collected either into a log file or on the console. Pour activer ces deux méthodes de collecte, ajoutez le code suivant au fichier App.Config de votre application.To enable both, add the following code to your application's App.Config file.

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Étapes suivantesNext steps