Zarządzanie zasadami rozwiązywania konfliktów w usłudze Azure Cosmos DB

DOTYCZY: NoSQL

W przypadku zapisu w wielu regionach, gdy wielu klientów zapisuje w tym samym elemencie, mogą wystąpić konflikty. W przypadku wystąpienia konfliktu można rozwiązać konflikt przy użyciu różnych zasad rozwiązywania konfliktów. W tym artykule opisano sposób zarządzania zasadami rozwiązywania konfliktów.

Napiwek

Zasady rozwiązywania konfliktów można określić tylko w czasie tworzenia kontenera i nie można ich modyfikować po utworzeniu kontenera.

Tworzenie zasad rozwiązywania konfliktów polegających na traktowaniu ostatniego zapisu jako prawidłowego

Poniższe przykłady pokazują, jak skonfigurować kontener za pomocą zasad rozwiązywania konfliktów polegających na traktowaniu ostatniego zapisu jako prawidłowego. Domyślną ścieżką last-writer-wins jest pole znacznika czasu lub _ts właściwość. W przypadku interfejsu API dla noSQL można również ustawić ścieżkę zdefiniowaną przez użytkownika z typem liczbowym. W konflikcie największa wartość wygrywa. Jeśli ścieżka nie jest ustawiona lub jest nieprawidłowa, wartość domyślna to _ts. Konflikty rozwiązane za pomocą tych zasad nie są wyświetlane w kanale informacyjnym konfliktów. Te zasady mogą być używane przez wszystkie interfejsy API.

Zestaw SDK platformy .NET

DocumentCollection lwwCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.lwwCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.LastWriterWins,
          ConflictResolutionPath = "/myCustomId",
      },
  });

Zestaw SDK języka Java w wersji 4

Zestaw JAVA SDK w wersji 4 (Maven com.azure::azure-cosmos) Async API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Zestawy SDK języka Java w wersji 2

Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const { container: lwwContainer } = await database.containers.createIfNotExists(
  {
    id: this.lwwContainerName,
    conflictResolutionPolicy: {
      mode: "LastWriterWins",
      conflictResolutionPath: "/myCustomId"
    }
  }
);

Zestaw SDK dla języka Python

database = client.get_database_client(database=database_id)
lww_conflict_resolution_policy = {'mode': 'LastWriterWins', 'conflictResolutionPath': '/regionId'}
lww_container = database.create_container(id=lww_container_id, partition_key=PartitionKey(path="/id"), 
    conflict_resolution_policy=lww_conflict_resolution_policy)

Tworzenie niestandardowych zasad rozwiązywania konfliktów przy użyciu procedury składowanej

Poniższe przykłady pokazują, jak skonfigurować kontener za pomocą niestandardowych zasad rozwiązywania konfliktów. Te zasady używają logiki w procedurze składowanej w celu rozwiązania konfliktu. Jeśli procedura składowana jest wyznaczona do rozwiązywania konfliktów, konflikty nie będą wyświetlane w kanale informacyjnym konfliktów, chyba że wystąpi błąd w wyznaczonej procedurze składowanej.

Po utworzeniu zasad za pomocą kontenera należy utworzyć procedurę składowaną. Poniższy przykład zestawu SDK platformy .NET przedstawia przykład tego przepływu pracy. Te zasady są obsługiwane tylko w interfejsie API dla noSQL.

Przykładowa niestandardowa procedura składowana rozwiązywania konfliktów

Niestandardowe procedury składowane rozwiązywania konfliktów muszą być implementowane przy użyciu podpisu funkcji pokazanego poniżej. Nazwa funkcji nie musi być zgodna z nazwą używaną podczas rejestrowania procedury składowanej w kontenerze, ale upraszcza nazewnictwo. Poniżej przedstawiono opis parametrów, które należy zaimplementować dla tej procedury składowanej.

  • incomingItem: element jest wstawiany lub aktualizowany w zatwierdzeniu, które generuje konflikty. Ma wartość null dla operacji usuwania.
  • existingItem: aktualnie zatwierdzony element. Ta wartość jest inna niż null w aktualizacji i ma wartość null dla wstawiania lub usuwania.
  • isTombstone: wartość logiczna wskazująca, czy element przychodzący jest w konflikcie z wcześniej usuniętym elementem. Jeśli wartość true, element existingItem ma również wartość null.
  • conflictingItems: tablica zatwierdzonej wersji wszystkich elementów w kontenerze, które powodują konflikt z elementem przychodzącym w identyfikatorze lub innych unikatowych właściwościach indeksu.

Ważne

Podobnie jak w przypadku każdej procedury składowanej, niestandardowa procedura rozwiązywania konfliktów może uzyskać dostęp do dowolnych danych z tym samym kluczem partycji i może wykonać dowolną operację wstawiania, aktualizowania lub usuwania w celu rozwiązania konfliktów.

Ta przykładowa procedura składowana rozwiązuje konflikty, wybierając najniższą wartość ze ścieżki /myCustomId .

function resolver(incomingItem, existingItem, isTombstone, conflictingItems) {
  var collection = getContext().getCollection();

  if (!incomingItem) {
      if (existingItem) {

          collection.deleteDocument(existingItem._self, {}, function (err, responseOptions) {
              if (err) throw err;
          });
      }
  } else if (isTombstone) {
      // delete always wins.
  } else {
      if (existingItem) {
          if (incomingItem.myCustomId > existingItem.myCustomId) {
              return; // existing item wins
          }
      }

      var i;
      for (i = 0; i < conflictingItems.length; i++) {
          if (incomingItem.myCustomId > conflictingItems[i].myCustomId) {
              return; // existing conflict item wins
          }
      }

      // incoming item wins - clear conflicts and replace existing with incoming.
      tryDelete(conflictingItems, incomingItem, existingItem);
  }

  function tryDelete(documents, incoming, existing) {
      if (documents.length > 0) {
          collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
              if (err) throw err;

              documents.shift();
              tryDelete(documents, incoming, existing);
          });
      } else if (existing) {
          collection.replaceDocument(existing._self, incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      } else {
          collection.createDocument(collection.getSelfLink(), incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      }
  }
}

Zestaw SDK platformy .NET

DocumentCollection udpCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.udpCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
          ConflictResolutionProcedure = string.Format("dbs/{0}/colls/{1}/sprocs/{2}", this.databaseName, this.udpCollectionName, "resolver"),
      },
  });

//Create the stored procedure
await clients[0].CreateStoredProcedureAsync(
UriFactory.CreateStoredProcedureUri(this.databaseName, this.udpCollectionName, "resolver"), new StoredProcedure
{
    Id = "resolver",
    Body = File.ReadAllText(@"resolver.js")
});

Zestaw SDK języka Java w wersji 4

Zestaw JAVA SDK w wersji 4 (Maven com.azure::azure-cosmos) Async API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Zestawy SDK języka Java w wersji 2

Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Po utworzeniu kontenera należy utworzyć procedurę składowaną resolver.

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const { container: udpContainer } = await database.containers.createIfNotExists(
  {
    id: this.udpContainerName,
    conflictResolutionPolicy: {
      mode: "Custom",
      conflictResolutionProcedure: `dbs/${this.databaseName}/colls/${
        this.udpContainerName
      }/sprocs/resolver`
    }
  }
);

Po utworzeniu kontenera należy utworzyć procedurę składowaną resolver.

Zestaw SDK dla języka Python

database = client.get_database_client(database=database_id)
udp_custom_resolution_policy = {'mode': 'Custom' }
udp_container = database.create_container(id=udp_container_id, partition_key=PartitionKey(path="/id"),
    conflict_resolution_policy=udp_custom_resolution_policy)

Po utworzeniu kontenera należy utworzyć procedurę składowaną resolver.

Tworzenie niestandardowych zasad rozwiązywania konfliktów

Poniższe przykłady pokazują, jak skonfigurować kontener za pomocą niestandardowych zasad rozwiązywania konfliktów. W przypadku tej implementacji każdy konflikt będzie wyświetlany w kanale informacyjnym konfliktów. Niezależnie od tego, czy konflikty są obsługiwane indywidualnie z kanału informacyjnego konfliktów.

Zestaw SDK platformy .NET

DocumentCollection manualCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.manualCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
      },
  });

Zestaw SDK języka Java w wersji 4

Zestaw JAVA SDK w wersji 4 (Maven com.azure::azure-cosmos) Async API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Zestawy SDK języka Java w wersji 2

Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const {
  container: manualContainer
} = await database.containers.createIfNotExists({
  id: this.manualContainerName,
  conflictResolutionPolicy: {
    mode: "Custom"
  }
});

Zestaw SDK dla języka Python

database = client.get_database_client(database=database_id)
manual_resolution_policy = {'mode': 'Custom'}
manual_container = database.create_container(id=manual_container_id, partition_key=PartitionKey(path="/id"), 
    conflict_resolution_policy=manual_resolution_policy)

Odczytywanie z kanału informacyjnego konfliktów

Te przykłady pokazują, jak odczytywać z kanału informacyjnego konfliktów kontenera. Konflikty mogą pojawiać się w kanale informacyjnym konfliktów tylko z kilku powodów:

  • Konflikt nie został rozwiązany automatycznie
  • Konflikt spowodował błąd z wyznaczoną procedurą składowaną
  • Zasady rozwiązywania konfliktów są ustawione na niestandardowe i nie wyznaczają procedury składowanej do obsługi konfliktów

Zestaw SDK platformy .NET

FeedResponse<Conflict> conflicts = await delClient.ReadConflictFeedAsync(this.collectionUri);

Zestawy SDK języka Java

Zestaw SDK języka Java w wersji 4 (Maven com.azure::azure-cosmos)

int requestPageSize = 3;
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();

CosmosPagedFlux<CosmosConflictProperties> conflictReadFeedFlux = container.readAllConflicts(options);

conflictReadFeedFlux.byPage(requestPageSize).toIterable().forEach(page -> {

    int expectedNumberOfConflicts = 0;
    int numberOfResults = 0;
    Iterator<CosmosConflictProperties> pageIt = page.getElements().iterator();

    while (pageIt.hasNext()) {
        CosmosConflictProperties conflictProperties = pageIt.next();

        // Read the conflict and committed item
        CosmosAsyncConflict conflict = container.getConflict(conflictProperties.getId());
        CosmosConflictResponse response = conflict.read(new CosmosConflictRequestOptions()).block();

        // response.
    }
});

Node.js/JavaScript/TypeScript SDK

const container = client
  .database(this.databaseName)
  .container(this.lwwContainerName);

const { result: conflicts } = await container.conflicts.readAll().toArray();

Python

conflicts_iterator = iter(container.list_conflicts())
conflict = next(conflicts_iterator, None)
while conflict:
    # Do something with conflict
    conflict = next(conflicts_iterator, None)

Następne kroki

Poznaj następujące pojęcia dotyczące usługi Azure Cosmos DB: