Modelo de solicitação de feed de alterações no Azure Cosmos DB

APLICA-SE A: NoSQL

Pode utilizar o modelo de solicitação do feed de alterações para consumir o feed de alterações do Azure Cosmos DB ao seu próprio ritmo. Semelhante ao processador do feed de alterações, pode utilizar o modelo de solicitação do feed de alterações para paralelizar o processamento de alterações em vários consumidores de feeds de alterações.

Comparar com o processador do feed de alterações

Muitos cenários podem processar o feed de alterações com o processador do feed de alterações ou o modelo de solicitação do feed de alterações. Os tokens de continuação do modelo pull e o contentor de concessão do processador do feed de alterações funcionam como marcadores para o último item processado ou lote de itens no feed de alterações.

No entanto, não pode converter tokens de continuação numa concessão ou vice-versa.

Nota

Na maioria dos casos, quando precisa de ler a partir do feed de alterações, a opção mais simples é utilizar o processador do feed de alterações.

Deve considerar a utilização do modelo pull nestes cenários:

  • Para ler as alterações de uma chave de partição específica.
  • Para controlar o ritmo a que o cliente recebe alterações para processamento.
  • Para efetuar uma leitura única dos dados existentes no feed de alterações (por exemplo, para fazer uma migração de dados).

Seguem-se algumas diferenças fundamentais entre o processador do feed de alterações e o modelo de solicitação do feed de alterações:

Funcionalidade Processador do feed de alterações Modelo Pull do feed de alterações
Manter um registo do ponto atual no processamento do feed de alterações Concessão (armazenada num contentor do Azure Cosmos DB) Token de continuação (armazenado na memória ou manualmente persistente)
Capacidade de reproduzir alterações anteriores Sim, com o modelo push Sim, com o modelo pull
Consultar alterações futuras Verifica automaticamente a existência de alterações com base no valor especificado pelo WithPollInterval utilizador Manual
Comportamento em que não existem novas alterações Aguarde automaticamente o valor e WithPollInterval , em seguida, volte a verificar Tem de verificar o estado e voltar a verificar manualmente
Processar alterações de um contentor inteiro Sim, e automaticamente paralelizado em vários threads e computadores que consomem a partir do mesmo contentor Sim e paralelizado manualmente com FeedRange
Processar alterações apenas a partir de uma única chave de partição Não suportado Yes

Nota

Quando utiliza o modelo pull, ao contrário da leitura com o processador do feed de alterações, tem de processar explicitamente casos em que não existem novas alterações.

Trabalhar com o modelo de solicitação

Para processar o feed de alterações com o modelo pull, crie uma instância de FeedIterator. Quando criar FeedIteratorinicialmente , tem de especificar um valor necessário ChangeFeedStartFrom , que consiste na posição inicial para ler as alterações e o valor que pretende utilizar para FeedRange. O FeedRange é um intervalo de valores de chave de partição e especifica os itens que podem ser lidos a partir do feed de alterações com esse específico FeedIterator. Também tem de especificar um valor necessário ChangeFeedMode para o modo no qual pretende processar alterações: versão mais recente ou todas as versões e eliminações. Utilize ou ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes para indicar o modo que pretende utilizar para ler o feed de alterações. Quando utiliza todas as versões e o modo de eliminação, tem de selecionar um feed de alterações a partir do valor de um ou Now() de um token de continuação específico.

Opcionalmente, pode especificar ChangeFeedRequestOptions para definir um PageSizeHint. Quando definida, esta propriedade define o número máximo de itens recebidos por página. Se as operações na coleção monitorizada forem realizadas através de procedimentos armazenados, o âmbito de transação é preservado ao ler itens do feed de alterações. Como resultado, o número de itens recebidos pode ser superior ao valor especificado para que os itens alterados pela mesma transação sejam devolvidos como parte de um lote atómico.

Eis um exemplo de como obter FeedIterator no modo de versão mais recente que devolve objetos de entidade, neste caso um User objeto:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Dica

Antes da versão 3.34.0, o modo de versão mais recente pode ser utilizado ao definir ChangeFeedMode.Incremental. Tanto o LatestVersion modo de versão mais recente do feed de alterações como as aplicações que utilizam ambos os Incremental modos verão o mesmo comportamento.

Todas as versões e o modo de eliminação estão em pré-visualização e podem ser utilizados com versões >do SDK .NET de pré-visualização = 3.32.0-preview. Eis um exemplo para obter FeedIterator em todas as versões e modo de eliminação que devolve objetos dinâmicos:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Nota

No modo de versão mais recente, recebe objetos que representam o item que foi alterado, com alguns metadados adicionais. Todas as versões e o modo de eliminação devolvem um modelo de dados diferente. Para obter mais informações, veja Analisar o objeto de resposta.

Consumir o feed de alterações através de fluxos

FeedIterator para ambos os modos de feed de alterações tem duas opções. Além dos exemplos que devolvem objetos de entidade, também pode obter a resposta com Stream suporte. Os fluxos permitem-lhe ler dados sem terem sido serializados pela primeira vez, pelo que poupa nos recursos do cliente.

Eis um exemplo de como obter FeedIterator no modo de versão mais recente que devolve Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Consumir as alterações de um contentor inteiro

Se não fornecer um FeedRange parâmetro para FeedIterator, pode processar todo o feed de alterações de um contentor ao seu próprio ritmo. Eis um exemplo, que começa a ler todas as alterações, a partir da hora atual através do modo de versão mais recente:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Uma vez que o feed de alterações é efetivamente uma lista infinita de itens que abrangem todas as escritas e atualizações futuras, o valor de HasMoreResults é sempre true. Quando tenta ler o feed de alterações e não existem novas alterações disponíveis, recebe uma resposta com NotModified o estado. No exemplo anterior, é processada ao aguardar cinco segundos antes de voltar a verificar se existem alterações.

Consumir as alterações de uma chave de partição

Em alguns casos, poderá querer processar apenas as alterações de uma chave de partição específica. Pode obter FeedIterator para uma chave de partição específica e processar as alterações da mesma forma que pode para um contentor inteiro.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Utilizar FeedRange para paralelização

No processador do feed de alterações, o trabalho é distribuído automaticamente por vários consumidores. No modelo de solicitação do feed de alterações, pode utilizar o FeedRange paralelizar o processamento do feed de alterações. A FeedRange representa um intervalo de valores de chave de partição.

Eis um exemplo que mostra como obter uma lista de intervalos para o contentor:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Quando obtém uma lista de FeedRange valores para o contentor, obtém um FeedRange por partição física.

Ao utilizar um FeedRange, pode criar um FeedIterator paralelizar o processamento do feed de alterações em vários computadores ou threads. Ao contrário do exemplo anterior que mostrou como obter um FeedIterator para todo o contentor ou uma única chave de partição, pode utilizar FeedRanges para obter vários FeedIterators, que podem processar o feed de alterações em paralelo.

No caso de querer utilizar FeedRanges, tem de ter um processo de orquestrador que obtenha FeedRanges e os distribua por essas máquinas. Esta distribuição pode ser:

  • Utilizar FeedRange.ToJsonString e distribuir este valor de cadeia. Os consumidores podem utilizar este valor com FeedRange.FromJsonString.
  • Se a distribuição estiver em processo, passe a referência do FeedRange objeto.

Eis um exemplo que mostra como ler a partir do início do feed de alterações do contentor ao utilizar duas máquinas hipotéticas separadas que leem em paralelo:

Computador 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Computador 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Guardar tokens de continuação

Pode guardar a posição do seu FeedIterator ao obter o token de continuação. Um token de continuação é um valor de cadeia que controla as últimas alterações processadas do FeedIterator e permite que o FeedIterator seja retomado mais tarde. O token de continuação, se especificado, tem precedência sobre a hora de início e começa a partir dos valores iniciais. O código seguinte lê o feed de alterações desde a criação do contentor. Depois de não existirem mais alterações disponíveis, irá manter um token de continuação para que o consumo do feed de alterações possa ser retomado mais tarde.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Quando estiver a utilizar o modo de versão mais recente, o FeedIterator token de continuação nunca expira, desde que o contentor do Azure Cosmos DB ainda exista. Quando estiver a utilizar todas as versões e o modo de eliminação, o FeedIterator token de continuação é válido, desde que as alterações ocorreram na janela de retenção para cópias de segurança contínuas.

Passos seguintes