Azure Cosmos DB의 충돌 해결 정책 관리

적용 대상: NoSQL

다중 영역 쓰기를 사용하면 여러 클라이언트가 동일한 항목에 쓸 때 충돌이 발생할 수 있습니다. 충돌이 발생할 때 여러 충돌 해결 정책을 사용하여 충돌을 해결할 수 있습니다. 이 문서에서는 충돌 해결 정책을 관리하는 방법을 설명합니다.

충돌 해결 정책은 컨테이너를 만들 때에만 지정할 수 있으며 컨테이너를 만든 후에는 수정할 수 없습니다.

최종 작성자 인정(last-writer-wins) 충돌 해결 정책 만들기

다음 샘플에서는 최종 작성자 인정 충돌 해결 정책을 사용하여 컨테이너를 설정하는 방법을 보여 줍니다. 최종 작성자 인정(last-writer-wins)의 기본 경로는 타임스탬프 필드 또는 _ts 속성입니다. API for NoSQL의 경우 숫자 형식의 사용자 정의 경로로 설정할 수도 있습니다. 충돌이 발생할 경우 가장 높은 값이 적용됩니다. 경로가 설정되지 않았거나 잘못된 경우 기본값인 _ts로 설정됩니다. 이 정책을 통해 해결된 충돌은 충돌 피드에 표시되지 않습니다. 이 정책은 모든 API에서 사용할 수 있습니다.

.NET SDK

DocumentCollection lwwCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.lwwCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.LastWriterWins,
          ConflictResolutionPath = "/myCustomId",
      },
  });

Java V4 SDK

Java SDK V4(Maven com.azure::azure-cosmos) Async API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Java V2 SDK

Async Java V2 SDK(Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const { container: lwwContainer } = await database.containers.createIfNotExists(
  {
    id: this.lwwContainerName,
    conflictResolutionPolicy: {
      mode: "LastWriterWins",
      conflictResolutionPath: "/myCustomId"
    }
  }
);

Python SDK

database = client.get_database_client(database=database_id)
lww_conflict_resolution_policy = {'mode': 'LastWriterWins', 'conflictResolutionPath': '/regionId'}
lww_container = database.create_container(id=lww_container_id, partition_key=PartitionKey(path="/id"), 
    conflict_resolution_policy=lww_conflict_resolution_policy)

저장 프로시저를 사용하여 사용자 지정 충돌 해결 정책 만들기

다음 샘플은 사용자 지정 충돌 해결 정책을 사용하여 컨테이너를 설정하는 방법을 보여줍니다. 이 정책은 저장 프로시저의 논리를 사용하여 충돌을 해결합니다. 저장 프로시저가 충돌을 해결하도록 지정된 경우 지정된 저장 프로시저에 오류가 없으면 충돌 피드에 충돌이 표시되지 않습니다.

컨테이너를 사용하여 정책을 만든 후에는 저장 프로시저를 만들어야 합니다. 아래의 .NET SDK 샘플은 이 워크플로의 예를 보여 줍니다. 이 정책은 API for NoSQL에서만 지원됩니다.

사용자 지정 충돌 해결 저장 프로시저 샘플

사용자 지정 충돌 해결 저장 프로시저는 아래에 표시된 함수 시그니처를 사용하여 구현해야 합니다. 함수 이름은 저장 프로시저를 컨테이너에 등록할 때 사용한 이름과 일치하지 않아도 되지만, 이 방법이 간단합니다. 다음은 이 저장 프로시저에 대해 구현해야 하는 매개 변수에 대한 설명입니다.

  • incomingItem: 커밋에서 삽입 또는 업데이트되는 항목이며, 이 항목 때문에 충돌이 발생합니다. 삭제 작업에 대해 Null입니다.
  • existingItem: 현재 커밋된 항목입니다. 이 값은 업데이트에서는 null이 아니고, 삽입 또는 삭제에서는 null입니다.
  • isTombstone: incomingItem이 이전에 삭제된 항목과 충돌하는지 여부를 나타내는 부울입니다. true이면 existingItem도 Null입니다.
  • conflictingItems: AID 또는 기타 고유 인덱스 속성에서 incomingItem과 충돌하는 컨테이너 내 모든 항목의 커밋된 버전의 배열입니다.

Important

저장 프로시저와 마찬가지로, 사용자 지정 충돌 해결 프로시저는 파티션 키가 동일한 모든 데이터에 액세스하여 충돌 해결을 위한 삽입, 업데이트 또는 삭제 작업을 수행할 수 있습니다.

이 샘플 저장 프로시저는 /myCustomId 경로에서 가장 낮은 값을 선택하여 충돌을 해결합니다.

function resolver(incomingItem, existingItem, isTombstone, conflictingItems) {
  var collection = getContext().getCollection();

  if (!incomingItem) {
      if (existingItem) {

          collection.deleteDocument(existingItem._self, {}, function (err, responseOptions) {
              if (err) throw err;
          });
      }
  } else if (isTombstone) {
      // delete always wins.
  } else {
      if (existingItem) {
          if (incomingItem.myCustomId > existingItem.myCustomId) {
              return; // existing item wins
          }
      }

      var i;
      for (i = 0; i < conflictingItems.length; i++) {
          if (incomingItem.myCustomId > conflictingItems[i].myCustomId) {
              return; // existing conflict item wins
          }
      }

      // incoming item wins - clear conflicts and replace existing with incoming.
      tryDelete(conflictingItems, incomingItem, existingItem);
  }

  function tryDelete(documents, incoming, existing) {
      if (documents.length > 0) {
          collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
              if (err) throw err;

              documents.shift();
              tryDelete(documents, incoming, existing);
          });
      } else if (existing) {
          collection.replaceDocument(existing._self, incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      } else {
          collection.createDocument(collection.getSelfLink(), incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      }
  }
}

.NET SDK

DocumentCollection udpCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.udpCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
          ConflictResolutionProcedure = string.Format("dbs/{0}/colls/{1}/sprocs/{2}", this.databaseName, this.udpCollectionName, "resolver"),
      },
  });

//Create the stored procedure
await clients[0].CreateStoredProcedureAsync(
UriFactory.CreateStoredProcedureUri(this.databaseName, this.udpCollectionName, "resolver"), new StoredProcedure
{
    Id = "resolver",
    Body = File.ReadAllText(@"resolver.js")
});

Java V4 SDK

Java SDK V4(Maven com.azure::azure-cosmos) Async API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Java V2 SDK

Async Java V2 SDK(Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

컨테이너가 만들어지면 resolver 저장 프로시저를 만들어야 합니다.

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const { container: udpContainer } = await database.containers.createIfNotExists(
  {
    id: this.udpContainerName,
    conflictResolutionPolicy: {
      mode: "Custom",
      conflictResolutionProcedure: `dbs/${this.databaseName}/colls/${
        this.udpContainerName
      }/sprocs/resolver`
    }
  }
);

컨테이너가 만들어지면 resolver 저장 프로시저를 만들어야 합니다.

Python SDK

database = client.get_database_client(database=database_id)
udp_custom_resolution_policy = {'mode': 'Custom' }
udp_container = database.create_container(id=udp_container_id, partition_key=PartitionKey(path="/id"),
    conflict_resolution_policy=udp_custom_resolution_policy)

컨테이너가 만들어지면 resolver 저장 프로시저를 만들어야 합니다.

사용자 지정 충돌 해결 정책 만들기

다음 샘플은 사용자 지정 충돌 해결 정책을 사용하여 컨테이너를 설정하는 방법을 보여줍니다. 이 구현으로 각 충돌이 충돌 피드에 표시됩니다. 충돌 피드에서 충돌을 개별적으로 처리하는 것은 사용자에게 달려 있습니다.

.NET SDK

DocumentCollection manualCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.manualCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
      },
  });

Java V4 SDK

Java SDK V4(Maven com.azure::azure-cosmos) Async API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Java V2 SDK

Async Java V2 SDK(Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const {
  container: manualContainer
} = await database.containers.createIfNotExists({
  id: this.manualContainerName,
  conflictResolutionPolicy: {
    mode: "Custom"
  }
});

Python SDK

database = client.get_database_client(database=database_id)
manual_resolution_policy = {'mode': 'Custom'}
manual_container = database.create_container(id=manual_container_id, partition_key=PartitionKey(path="/id"), 
    conflict_resolution_policy=manual_resolution_policy)

충돌 피드에서 읽기

다음 샘플은 컨테이너의 충돌 피드에서 읽는 방법을 보여줍니다. 충돌은 몇 가지 이유로만 충돌 피드에 표시될 수 있습니다.

  • 충돌이 자동으로 해결되지 않았습니다.
  • 충돌로 인해 지정된 저장 프로시저에 오류가 발생했습니다.
  • 충돌 해결 정책이 사용자 지정으로 설정되어 있으며 충돌을 처리할 저장 프로시저를 지정하지 않습니다.

.NET SDK

FeedResponse<Conflict> conflicts = await delClient.ReadConflictFeedAsync(this.collectionUri);

Java SDK

Java V4 SDK(Maven com.azure::azure-cosmos)

int requestPageSize = 3;
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();

CosmosPagedFlux<CosmosConflictProperties> conflictReadFeedFlux = container.readAllConflicts(options);

conflictReadFeedFlux.byPage(requestPageSize).toIterable().forEach(page -> {

    int expectedNumberOfConflicts = 0;
    int numberOfResults = 0;
    Iterator<CosmosConflictProperties> pageIt = page.getElements().iterator();

    while (pageIt.hasNext()) {
        CosmosConflictProperties conflictProperties = pageIt.next();

        // Read the conflict and committed item
        CosmosAsyncConflict conflict = container.getConflict(conflictProperties.getId());
        CosmosConflictResponse response = conflict.read(new CosmosConflictRequestOptions()).block();

        // response.
    }
});

Node.js/JavaScript/TypeScript SDK

const container = client
  .database(this.databaseName)
  .container(this.lwwContainerName);

const { result: conflicts } = await container.conflicts.readAll().toArray();

Python

conflicts_iterator = iter(container.list_conflicts())
conflict = next(conflicts_iterator, None)
while conflict:
    # Do something with conflict
    conflict = next(conflicts_iterator, None)

다음 단계

다음 Azure Cosmos DB 개념에 대해 자세히 알아봅니다.