Použití knihovny Bulk Executor .NET k provádění hromadných operací ve službě Azure Cosmos DB

PLATÍ PRO: NoSQL

Poznámka:

Knihovna Bulk Executor popsaná v tomto článku se udržuje pro aplikace, které používají verzi sady .NET SDK 2.x. Pro nové aplikace můžete použít hromadnou podporu , která je přímo dostupná se sadou .NET SDK verze 3.x a nevyžaduje žádnou externí knihovnu.

Pokud aktuálně používáte knihovnu Bulk Executor a plánujete migrovat na hromadnou podporu v novější sadě SDK, postupujte podle kroků v průvodci migrací k migraci aplikace.

Tento kurz obsahuje pokyny k použití knihovny Bulk Executor .NET k importu a aktualizaci dokumentů do kontejneru Azure Cosmos DB. Informace o knihovně Bulk Executor a o tom, jak vám pomůže používat obrovskou propustnost a úložiště, najdete v přehledu knihovny bulk Executor služby Azure Cosmos DB. V tomto kurzu se zobrazí ukázková aplikace .NET, ve které se hromadně importují náhodně generované dokumenty do kontejneru Azure Cosmos DB. Po importu dat vám knihovna ukáže, jak můžete importovaná data hromadně aktualizovat zadáním oprav jako operací, které se mají provést u konkrétních polí dokumentu.

Knihovna bulk executoru je v současné době podporována pouze účty Azure Cosmos DB for NoSQL a API pro účty Gremlin. Tento článek popisuje, jak používat knihovnu Bulk Executor .NET s účty API pro NoSQL. Pokud chcete zjistit, jak používat knihovnu Bulk Executor .NET s rozhraním API pro účty Gremlin, přečtěte si, jak hromadně přijímat data ve službě Azure Cosmos DB pro Gremlin pomocí knihovny Bulk Executor.

Předpoklady

Klonování ukázkové aplikace

Teď přejdeme na práci s kódem stažením ukázkové aplikace .NET z GitHubu. Tato aplikace provádí hromadné operace s daty uloženými v účtu služby Azure Cosmos DB. Pokud chcete aplikaci naklonovat, otevřete příkazový řádek, přejděte do adresáře, do kterého ji chcete zkopírovat, a spusťte následující příkaz:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Klonované úložiště obsahuje dvě ukázky: BulkImportSample a BulkUpdateSample. Můžete otevřít některou z ukázkových aplikací, aktualizovat připojovací řetězec s v souboru App.config pomocí připojovací řetězec účtu služby Azure Cosmos DB, sestavit řešení a spustit ho.

Aplikace BulkImportSample generuje náhodné dokumenty a hromadně je naimportuje do účtu služby Azure Cosmos DB. Aplikace BulkUpdateSample hromadně aktualizuje importované dokumenty zadáním oprav jako operací, které se mají provést u konkrétních polí dokumentu. V dalších částech si projdete kód v každé z těchto ukázkových aplikací.

Hromadný import dat do účtu služby Azure Cosmos DB

  1. Přejděte do složky BulkImportSample a otevřete soubor BulkImportSample.sln .

  2. Připojovací řetězec služby Azure Cosmos DB se načtou ze souboru App.config, jak je znázorněno v následujícím kódu:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    Hromadný import vytvoří novou databázi a kontejner s názvem databáze, názvem kontejneru a hodnotami propustnosti zadanými v souboru App.config.

  3. Dále se objekt DocumentClient inicializuje pomocí režimu přímého připojení TCP:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. Objekt BulkExecutor je inicializován s vysokou hodnotou opakování pro dobu čekání a omezené požadavky. Pak jsou nastaveny na hodnotu 0, aby předaly řízení zahlcení bulkExecutoru po dobu jeho životnosti.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. Aplikace vyvolá rozhraní BulkImportAsync API. Knihovna .NET poskytuje dvě přetížení rozhraní API hromadného importu – jedno, které přijímá seznam serializovaných dokumentů JSON a druhý, který přijímá seznam deserializovaných dokumentů POCO. Další informace o definicích každé z těchto přetížených metod najdete v dokumentaci k rozhraní API.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Metoda BulkImportAsync přijímá následující parametry:

    Parametr Popis
    enableUpsert Příznak pro povolení operací upsertu v dokumentech. Pokud dokument s daným ID už existuje, aktualizuje se. Ve výchozím nastavení je nastavená na false.
    disableAutomaticIdGeneration Příznak, který zakáže automatické generování ID. Ve výchozím nastavení je nastavená na hodnotu true.
    maxConcurrencyPerPartitionKeyRange Maximální stupeň souběžnosti na rozsah klíčů oddílu. Nastavení na hodnotu null způsobí, že knihovna použije výchozí hodnotu 20.
    maxInMemorySortingBatchSize Maximální počet dokumentů načítaných z enumerátoru dokumentu, který se předává volání rozhraní API v každé fázi. Pro fázi řazení v paměti, ke které dochází před hromadným importem. Nastavení tohoto parametru na hodnotu null způsobí, že knihovna použije výchozí minimální hodnotu (documents.count, 1000000).
    Cancellationtoken Token zrušení pro řádné ukončení operace hromadného importu.

Definice objektu odpovědi hromadného importu
Výsledek volání rozhraní API hromadného importu obsahuje následující atributy:

Parametr Popis
NumberOfDocumentsImported (long) Celkový počet dokumentů, které byly úspěšně importovány z celkového počtu dokumentů zadaných do volání rozhraní API hromadného importu.
TotalRequestUnitsConsumed (double) Celkový počet jednotek žádostí spotřebovaných voláním rozhraní API hromadného importu
TotalTimeTaken (TimeSpan) Celkový čas potřebný voláním rozhraní API hromadného importu k dokončení provádění.
BadInputDocuments (List<– objekt>) Seznam dokumentů ve špatném formátu, které nebyly úspěšně importovány ve volání rozhraní API hromadného importu. Opravte vrácené dokumenty a zkuste import zopakovat. Dokumenty s chybným formátováním zahrnují dokumenty, jejichž hodnota ID není řetězec (hodnota null nebo jiný datový typ je považován za neplatný).

Hromadná aktualizace dat v účtu služby Azure Cosmos DB

Existující dokumenty můžete aktualizovat pomocí rozhraní BulkUpdateAsync API. V tomto příkladu Name nastavíte pole na novou hodnotu a odeberete Description pole z existujících dokumentů. Úplnou sadu podporovaných operací aktualizací najdete v dokumentaci k rozhraní API.

  1. Přejděte do složky BulkUpdateSample a otevřete soubor BulkUpdateSample.sln .

  2. Definujte položky aktualizace spolu s odpovídajícími operacemi aktualizace polí. V tomto příkladu pomocí SetUpdateOperation aktualizujete Name pole a UnsetUpdateOperation odeberete Description pole ze všech dokumentů. Můžete také provádět jiné operace, jako je zvýšení pole dokumentu o určitou hodnotu, nasdílení konkrétních hodnot do pole pole nebo odebrání konkrétní hodnoty z pole pole. Informace o různých metodách poskytovaných rozhraním API hromadné aktualizace najdete v dokumentaci k rozhraní API.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. Aplikace vyvolá rozhraní BulkUpdateAsync API. Informace o definici metody BulkUpdateAsync najdete v dokumentaci k rozhraní API.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Metoda BulkUpdateAsync přijímá následující parametry:

    Parametr Popis
    maxConcurrencyPerPartitionKeyRange Maximální stupeň souběžnosti na rozsah klíčů oddílu. Nastavením tohoto parametru na hodnotu null knihovna použije výchozí hodnotu(20).
    maxInMemorySortingBatchSize Maximální počet aktualizačních položek načítaných z enumerátoru aktualizačních položek předaných volání rozhraní API v každé fázi. Pro fázi řazení v paměti, ke které dochází před hromadnou aktualizací. Nastavení tohoto parametru na hodnotu null způsobí, že knihovna použije výchozí minimální hodnotu (updateItems.count, 10000000).
    Cancellationtoken Token zrušení pro řádné ukončení operace hromadné aktualizace.

Definice objektu hromadné aktualizace odpovědi
Výsledek volání rozhraní API hromadné aktualizace obsahuje následující atributy:

Parametr Popis
NumberOfDocumentsUpdated (dlouhý) Početdokumentůch
TotalRequestUnitsConsumed (double) Celkový počet jednotek žádostí spotřebovaných voláním rozhraní API hromadné aktualizace
TotalTimeTaken (TimeSpan) Celková doba potřebná voláním rozhraní API hromadné aktualizace k dokončení provádění.

Tipy týkající se výkonu

Pokud používáte knihovnu Bulk Executor, zvažte následující body pro lepší výkon:

  • Pokud chcete dosáhnout nejlepšího výkonu, spusťte aplikaci z virtuálního počítače Azure, který je ve stejné oblasti jako oblast zápisu účtu služby Azure Cosmos DB.

  • Doporučujeme vytvořit instanci jednoho objektu BulkExecutor pro celou aplikaci v rámci jednoho virtuálního počítače, který odpovídá určitému kontejneru Azure Cosmos DB.

  • Při interním vytváření více úloh využívá jedno rozhraní API pro hromadnou operaci velké množství procesoru a vstupně-výstupních operací klientského počítače. Vyhněte se vytváření více souběžných úloh v rámci procesu aplikace, které provádějí volání rozhraní API hromadné operace. Pokud jedno volání rozhraní API hromadné operace spuštěné na jednom virtuálním počítači nemůže spotřebovat propustnost celého kontejneru (pokud propustnost > kontejneru 1 milion RU/s), je vhodnější vytvořit samostatné virtuální počítače pro souběžné spouštění volání rozhraní API hromadné operace.

  • Zajistěte, InitializeAsync() aby byla metoda vyvolána po vytvoření instance objektu BulkExecutor pro načtení mapování oddílů cílového kontejneru Azure Cosmos DB.

  • V souboru App.Config vaší aplikace se ujistěte, že je pro lepší výkon povolený gcServer .

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • Knihovna generuje trasování, která se dají shromažďovat buď do souboru protokolu, nebo do konzoly. Pokud chcete obojí povolit, přidejte do souboru App.Config vaší aplikace následující kód:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Další kroky