Używanie biblioteki funkcji wykonawczej operacji zbiorczych platformy .NET do wykonywania operacji zbiorczych w usłudze Azure Cosmos DB

DOTYCZY: NoSQL

Uwaga

Biblioteka funkcji wykonawczej operacji zbiorczych opisana w tym artykule jest obsługiwana dla aplikacji korzystających z zestawu .NET SDK 2.x. W przypadku nowych aplikacji można użyć obsługi zbiorczej dostępnej bezpośrednio w zestawie SDK platformy .NET w wersji 3.x i nie wymaga żadnej biblioteki zewnętrznej.

Jeśli obecnie używasz biblioteki funkcji wykonawczej operacji zbiorczych i planujesz przeprowadzić migrację zbiorczą do nowszego zestawu SDK, wykonaj kroki opisane w przewodniku migracji, aby przeprowadzić migrację aplikacji.

Ten samouczek zawiera instrukcje dotyczące używania biblioteki funkcji wykonawczej operacji zbiorczych platformy .NET do importowania i aktualizowania dokumentów do kontenera usługi Azure Cosmos DB. Aby dowiedzieć się więcej o bibliotece funkcji wykonawczej operacji zbiorczych i o tym, jak ułatwia korzystanie z ogromnej przepływności i magazynu, zobacz Omówienie biblioteki funkcji wykonawczej operacji zbiorczych usługi Azure Cosmos DB. W tym samouczku zostanie wyświetlona przykładowa aplikacja .NET, w której importowane zbiorczo dokumenty są importowane losowo do kontenera usługi Azure Cosmos DB. Po zaimportowaniu danych biblioteka pokazuje, jak można zbiorczo zaktualizować zaimportowane dane, określając poprawki jako operacje wykonywane na określonych polach dokumentu.

Obecnie biblioteka funkcji wykonawczej operacji zbiorczych jest obsługiwana tylko przez usługę Azure Cosmos DB for NoSQL i interfejs API dla kont języka Gremlin. W tym artykule opisano sposób używania biblioteki funkcji wykonawczej operacji zbiorczych platformy .NET z interfejsem API dla kont NoSQL. Aby dowiedzieć się, jak używać biblioteki funkcji wykonawczej operacji zbiorczych platformy .NET z interfejsem API dla kont języka Gremlin, zobacz Zbiorcze pozyskiwanie danych w usłudze Azure Cosmos DB for Gremlin przy użyciu biblioteki funkcji wykonawczej zbiorczej.

Wymagania wstępne

Klonowanie przykładowej aplikacji

Teraz przejdźmy do pracy z kodem, pobierając przykładową aplikację .NET z usługi GitHub. Ta aplikacja wykonuje operacje zbiorcze na danych przechowywanych na koncie usługi Azure Cosmos DB. Aby sklonować aplikację, otwórz wiersz polecenia, przejdź do katalogu, w którym chcesz go skopiować, i uruchom następujące polecenie:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Sklonowane repozytorium zawiera dwa przykłady: BulkImportSample i BulkUpdateSample. Możesz otworzyć jedną z przykładowych aplikacji, zaktualizować parametry połączenia w pliku App.config przy użyciu parametry połączenia konta usługi Azure Cosmos DB, skompilować rozwiązanie i uruchomić je.

Aplikacja BulkImportSample generuje losowe dokumenty i importuje je zbiorczo na konto usługi Azure Cosmos DB. Aplikacja BulkUpdateSample zbiorczo aktualizuje zaimportowane dokumenty, określając poprawki jako operacje wykonywane w określonych polach dokumentu. W następnych sekcjach zapoznasz się z kodem w każdej z tych przykładowych aplikacji.

Zbiorcze importowanie danych do konta usługi Azure Cosmos DB

  1. Przejdź do folderu BulkImportSample i otwórz plik BulkImportSample.sln .

  2. Parametry połączenia usługi Azure Cosmos DB są pobierane z pliku App.config, jak pokazano w poniższym kodzie:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    Importer zbiorczy tworzy nową bazę danych i kontener z nazwą bazy danych, nazwą kontenera i wartościami przepływności określonymi w pliku App.config.

  3. Następnie obiekt DocumentClient jest inicjowany przy użyciu trybu bezpośredniego połączenia TCP:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. Obiekt BulkExecutor jest inicjowany z wysoką wartością ponawiania dla czasu oczekiwania i ograniczonych żądań. Następnie są one ustawione na 0, aby przekazać kontrolę przeciążenia do BulkExecutor przez jego okres istnienia.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. Aplikacja wywołuje interfejs API BulkImportAsync . Biblioteka .NET udostępnia dwa przeciążenia interfejsu API importu zbiorczego — jeden, który akceptuje listę serializowanych dokumentów JSON, a drugi, który akceptuje listę deserializacji dokumentów POCO. Aby dowiedzieć się więcej na temat definicji każdej z tych przeciążonych metod, zapoznaj się z dokumentacją interfejsu API.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Metoda BulkImportAsync akceptuje następujące parametry:

    Parametr Opis
    enableUpsert Flaga umożliwiająca włączanie operacji upsert w dokumentach. Jeśli dokument o danym identyfikatorze już istnieje, zostanie zaktualizowany. Domyślnie jest ustawiona wartość false.
    disableAutomaticIdGeneration Flaga wyłączania automatycznego generowania identyfikatora. Domyślnie jest ustawiona wartość true.
    maxConcurrencyPerPartitionKeyRange Maksymalny stopień współbieżności na zakres kluczy partycji. Ustawienie wartości null powoduje, że biblioteka będzie używać wartości domyślnej 20.
    maxInMemorySortingBatchSize Maksymalna liczba dokumentów ściągniętych z modułu wyliczającego dokumentu, która jest przekazywana do wywołania interfejsu API na każdym etapie. W przypadku fazy sortowania w pamięci, która odbywa się przed importowaniem zbiorczym. Ustawienie tego parametru na wartość null powoduje, że biblioteka używa domyślnej wartości minimalnej (documents.count, 1000000).
    Cancellationtoken Token anulowania, aby bezpiecznie zakończyć operację importowania zbiorczego.

Definicja obiektu odpowiedzi importu zbiorczego
Wynik wywołania interfejsu API importu zbiorczego zawiera następujące atrybuty:

Parametr Opis
NumberOfDocuments Zaimportowane (długie) Całkowita liczba dokumentów, które zostały pomyślnie zaimportowane z łącznej liczby dokumentów dostarczonych do wywołania interfejsu API importu zbiorczego.
TotalRequestUnitsConsumed (dwukrotnie) Łączna liczba jednostek żądań (RU) używanych przez wywołanie interfejsu API importu zbiorczego.
TotalTimeTaken (TimeSpan) Łączny czas potrzebny na wywołanie interfejsu API importu zbiorczego w celu ukończenia wykonywania.
BadInputDocuments (obiekt> List<) Lista dokumentów w nieprawidłowym formacie, które nie zostały pomyślnie zaimportowane w wywołaniu interfejsu API importu zbiorczego. Napraw zwrócone dokumenty i ponów próbę zaimportowania. Nieprawidłowe sformatowane dokumenty obejmują dokumenty, których wartość identyfikatora nie jest ciągiem (wartość null lub inny typ danych jest uznawany za nieprawidłowy).

Zbiorcze aktualizowanie danych na koncie usługi Azure Cosmos DB

Istniejące dokumenty można zaktualizować przy użyciu interfejsu API BulkUpdateAsync . W tym przykładzie Name ustawisz pole na nową wartość i usuniesz Description pole z istniejących dokumentów. Pełny zestaw obsługiwanych operacji aktualizacji można znaleźć w dokumentacji interfejsu API.

  1. Przejdź do folderu BulkUpdateSample i otwórz plik BulkUpdateSample.sln .

  2. Zdefiniuj elementy aktualizacji wraz z odpowiednimi operacjami aktualizacji pól. W tym przykładzie użyjesz polecenia SetUpdateOperation, aby zaktualizować Name pole i polecenie UnsetUpdateOperation w celu usunięcia Description pola ze wszystkich dokumentów. Można również wykonywać inne operacje, takie jak zwiększanie pola dokumentu przez określoną wartość, wypychanie określonych wartości do pola tablicy lub usuwanie określonej wartości z pola tablicy. Aby dowiedzieć się więcej o różnych metodach udostępnianych przez interfejs API aktualizacji zbiorczej, zapoznaj się z dokumentacją interfejsu API.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. Aplikacja wywołuje interfejs API BulkUpdateAsync . Aby dowiedzieć się więcej o definicji metody BulkUpdateAsync, zapoznaj się z dokumentacją interfejsu API.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Metoda BulkUpdateAsync akceptuje następujące parametry:

    Parametr Opis
    maxConcurrencyPerPartitionKeyRange Maksymalny stopień współbieżności na zakres kluczy partycji. Ustawienie tego parametru na wartość null powoduje, że biblioteka używa wartości domyślnej (20).
    maxInMemorySortingBatchSize Maksymalna liczba elementów aktualizacji ściągniętych z modułu wyliczającego aktualizacji przekazana do wywołania interfejsu API na każdym etapie. W przypadku fazy sortowania w pamięci, która odbywa się przed aktualizacją zbiorczą. Ustawienie tego parametru na wartość null powoduje, że biblioteka używa domyślnej wartości minimalnej (updateItems.count, 1000000).
    Cancellationtoken Token anulowania, aby bezpiecznie zakończyć operację aktualizacji zbiorczej.

Definicja obiektu odpowiedzi aktualizacji zbiorczej
Wynik wywołania interfejsu API aktualizacji zbiorczej zawiera następujące atrybuty:

Parametr Opis
NumberOfDocumentsUpdated (długa) Liczba dokumentów, które zostały pomyślnie zaktualizowane poza łączną liczbę dokumentów dostarczonych do wywołania interfejsu API aktualizacji zbiorczej.
TotalRequestUnitsConsumed (dwukrotnie) Łączna liczba jednostek żądań używanych przez wywołanie interfejsu API aktualizacji zbiorczej.
TotalTimeTaken (TimeSpan) Łączny czas potrzebny przez wywołanie interfejsu API aktualizacji zbiorczej w celu ukończenia wykonywania.

Wskazówki dotyczące wydajności

Podczas korzystania z biblioteki funkcji wykonawczej operacji zbiorczych należy wziąć pod uwagę następujące kwestie:

  • Aby uzyskać najlepszą wydajność, uruchom aplikację z maszyny wirtualnej platformy Azure, która znajduje się w tym samym regionie co region zapisu konta usługi Azure Cosmos DB.

  • Zaleca się utworzenie wystąpienia pojedynczego obiektu BulkExecutor dla całej aplikacji w ramach jednej maszyny wirtualnej odpowiadającej określonemu kontenerowi usługi Azure Cosmos DB.

  • Pojedyncze wykonanie interfejsu API operacji zbiorczej zużywa duży fragment procesora CPU i operacji we/wy sieci komputera klienckiego podczas tworzenia wielu zadań wewnętrznie. Unikaj duplikowania wielu współbieżnych zadań w procesie aplikacji, które wykonują wywołania interfejsu API operacji zbiorczej. Jeśli pojedyncze wywołanie interfejsu API operacji zbiorczej uruchomione na jednej maszynie wirtualnej nie może korzystać z przepływności całego kontenera (jeśli przepływność > kontenera wynosi 1 milion RU/s), preferowane jest utworzenie oddzielnych maszyn wirtualnych w celu współbieżnego wykonywania wywołań interfejsu API operacji zbiorczej.

  • Upewnij się, InitializeAsync() że metoda jest wywoływana po utworzeniu wystąpienia obiektu BulkExecutor w celu pobrania docelowej mapy partycji kontenera usługi Azure Cosmos DB.

  • W aplikacji App.Config upewnij się, że serwer gcServer jest włączony w celu uzyskania lepszej wydajności

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • Biblioteka emituje ślady, które można zebrać w pliku dziennika lub w konsoli programu . Aby włączyć oba te elementy, dodaj następujący kod do pliku App.Config aplikacji:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Następne kroki