Introduzione a ReliableConcurrentQueue in Azure Service FabricIntroduction to ReliableConcurrentQueue in Azure Service Fabric

La coda simultanea affidabile è una coda replicata, transazionale e asincrona che assicura concorrenza elevata per le operazioni di accodamento e rimozione dalla coda.Reliable Concurrent Queue is an asynchronous, transactional, and replicated queue which features high concurrency for enqueue and dequeue operations. È progettata per offrire velocità effettiva elevata e bassa latenza allentando il vincolo di ordinamento FIFO fornito dalla coda affidabile e fornisce invece un ordinamento in base al migliore sforzo.It is designed to deliver high throughput and low latency by relaxing the strict FIFO ordering provided by Reliable Queue and instead provides a best-effort ordering.

APIAPIs

Coda simultaneaConcurrent Queue Coda simultanea affidabileReliable Concurrent Queue
void Enqueue(T item)void Enqueue(T item) Task EnqueueAsync(ITransaction tx, T item)Task EnqueueAsync(ITransaction tx, T item)
bool TryDequeue(out T result)bool TryDequeue(out T result) Task< ConditionalValue < T > > TryDequeueAsync(ITransaction tx)Task< ConditionalValue < T > > TryDequeueAsync(ITransaction tx)
int Count()int Count() long Count()long Count()

Confronto con la coda affidabileComparison with Reliable Queue

La coda simultanea affidabile è disponibile come alternativa alla coda affidabile.Reliable Concurrent Queue is offered as an alternative to Reliable Queue. Deve essere usata nei casi in cui non è necessario l'ordine FIFO in modo vincolante poiché l'ordine FIFO richiede un compromesso in termini di concorrenza.It should be used in cases where strict FIFO ordering is not required, as guaranteeing FIFO requires a tradeoff with concurrency. La coda affidabile usa i blocchi per applicare l'ordinamento FIFO, consentendo al massimo una transazione per l'accodamento e al massimo una transazione per la rimozione dalla coda.Reliable Queue uses locks to enforce FIFO ordering, with at most one transaction allowed to enqueue and at most one transaction allowed to dequeue at a time. La coda simultanea affidabile allenta invece il vincolo di ordinamento e consente a un qualsiasi numero di transazioni simultanee di interfoliare le operazioni di accodamento e rimozione dalla coda.In comparison, Reliable Concurrent Queue relaxes the ordering constraint and allows any number concurrent transactions to interleave their enqueue and dequeue operations. Viene fornito l'ordinamento in base al migliore sforzo; tuttavia non è mai possibile garantire l'ordinamento dei due valori in una coda simultanea affidabile.Best-effort ordering is provided, however the relative ordering of two values in a Reliable Concurrent Queue can never be guaranteed.

La coda simultanea affidabile garantisce maggiore velocità effettiva e latenza ridotta rispetto alla coda affidabile ogni volta che sono presenti più transazioni simultanee che eseguono operazioni di accodamento e/o rimozione dalla coda.Reliable Concurrent Queue provides higher throughput and lower latency than Reliable Queue whenever there are multiple concurrent transactions performing enqueues and/or dequeues.

Un esempio di caso d'uso per ReliableConcurrentQueue è lo scenario della coda di messaggi.A sample use case for the ReliableConcurrentQueue is the Message Queue scenario. In questo scenario uno o più producer di messaggi creano e aggiungono elementi nella coda e uno o più consumer di messaggi prelevano i messaggi dalla coda e li elaborano.In this scenario, one or more message producers create and add items to the queue, and one or more message consumers pull messages from the queue and process them. Più producer e consumer possono operare in modo indipendente usando le transazioni simultanee per elaborare la coda.Multiple producers and consumers can work independently, using concurrent transactions in order to process the queue.

Linee guida per l'usoUsage Guidelines

  • Il periodo di conservazione previsto per gli elementi nella coda è minimo,The queue expects that the items in the queue have a low retention period. ovvero gli elementi non rimangono nella coda per molto tempo.That is, the items would not stay in the queue for a long time.
  • La coda non garantisce l'ordine FIFO in modo vincolante.The queue does not guarantee strict FIFO ordering.
  • La coda non legge le proprie scritture.The queue does not read its own writes. Se un elemento viene accodato all'interno di una transazione, non sarà visibile a un dequeuer all'interno della stessa transazione.If an item is enqueued within a transaction, it will not be visible to a dequeuer within the same transaction.
  • Le rimozioni dalla coda non sono isolate tra loro.Dequeues are not isolated from each other. Se l'elemento A viene rimosso dalla coda nella transazione txnA, anche se non viene eseguito il commit di txnA, l'elemento A non sarà visibile a una transazione simultanea txnB.If item A is dequeued in transaction txnA, even though txnA is not committed, item A would not be visible to a concurrent transaction txnB. Se la transazione txnA viene interrotta, A diventa immediatamente visibile alla transazione txnB.If txnA aborts, A will become visible to txnB immediately.
  • Il comportamento di TryPeekAsync può essere implementato usando un TryDequeueAsync e quindi interrompendo la transazione.TryPeekAsync behavior can be implemented by using a TryDequeueAsync and then aborting the transaction. Un esempio in proposito è reperibile nella sezione relativa ai modelli di programmazione.An example of this can be found in the Programming Patterns section.
  • Il conteggio non è transazionale.Count is non-transactional. Può dare un'idea del numero di elementi in una coda, ma rappresenta un valore temporizzato e non può essere ritenuto affidabile.It can be used to get an idea of the number of elements in the queue, but represents a point-in-time and cannot be relied upon.
  • Non è opportuno eseguire un'elaborazione dispendiosa sugli elementi rimossi dalla coda mentre la transazione è attiva per evitare transazioni ad esecuzione prolungata che possono impattare sulle prestazioni del sistema.Expensive processing on the dequeued items should not be performed while the transaction is active, to avoid long-running transactions which may have a performance impact on the system.

Frammenti di codiceCode Snippets

Ecco alcuni frammenti di codice e i relativi output previsti.Let us look at a few code snippets and their expected outputs. La gestione delle eccezioni viene ignorata in questa sezione.Exception handling is ignored in this section.

EnqueueAsyncEnqueueAsync

Ecco alcuni frammenti di codice per l'uso di EnqueueAsync con i relativi output previsti.Here are a few code snippets for using EnqueueAsync followed by their expected outputs.

  • Caso 1: Singola attività di accodamentoCase 1: Single Enqueue Task
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

Si supponga che l'attività sia stata completata e che non siano presenti transazioni simultanee che modificano la coda.Assume that the task completed successfully, and that there were no concurrent transactions modifying the queue. L'utente può presupporre che la coda contenga gli elementi in uno dei seguenti ordini:The user can expect the queue to contain the items in any of the following orders:

10, 2010, 20

20, 1020, 10

  • Caso 2: Attività di accodamento in paralleloCase 2: Parallel Enqueue Task
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

Si supponga che le attività siano state completate, che siano state eseguite in parallelo e che non sono presenti altre transazioni concorrenti che modificano la coda.Assume that the tasks completed successfully, that the tasks ran in parallel, and that there were no other concurrent transactions modifying the queue. Non è possibile eseguire alcuna inferenza sull'ordine degli elementi nella coda.No inference can be made about the order of items in the queue. Per questo frammento di codice, gli elementi possono comparire in uno qualsiasi dei 4For this code snippet, the items may appear in any of the 4! ordinamenti possibili.possible orderings. La coda tenterà di mantenere gli elementi nell'ordine (in coda) originale, ma potrebbe essere necessario eseguire un riordino a causa di operazioni simultanee o errori.The queue will attempt to keep the items in the original (enqueued) order, but may be forced to reorder them due to concurrent operations or faults.

DequeueAsyncDequeueAsync

Ecco alcuni frammenti di codice per l'uso di TryDequeueAsync con i relativi output previsti.Here are a few code snippets for using TryDequeueAsync followed by the expected outputs. Si supponga che la coda sia già stata popolata con i seguenti elementi nella coda:Assume that the queue is already populated with the following items in the queue:

10, 20, 30, 40, 50, 6010, 20, 30, 40, 50, 60

  • Caso 1: Singola attività di rimozione dalla codaCase 1: Single Dequeue Task
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

Si supponga che l'attività sia stata completata e che non siano presenti transazioni simultanee che modificano la coda.Assume that the task completed successfully, and that there were no concurrent transactions modifying the queue. Poiché non è possibile eseguire alcuna inferenza sull'ordine degli elementi nella coda, tutti e tre gli elementi possono essere rimossi dalla coda in qualsiasi ordine.Since no inference can be made about the order of the items in the queue, any three of the items may be dequeued, in any order. La coda tenterà di mantenere gli elementi nell'ordine (in coda) originale, ma potrebbe essere necessario eseguire un riordino a causa di operazioni simultanee o errori.The queue will attempt to keep the items in the original (enqueued) order, but may be forced to reorder them due to concurrent operations or faults.

  • Caso 2: Attività di rimozione dalla coda in paralleloCase 2: Parallel Dequeue Task
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

Si supponga che le attività siano state completate, che siano state eseguite in parallelo e che non sono presenti altre transazioni concorrenti che modificano la coda.Assume that the tasks completed successfully, that the tasks ran in parallel, and that there were no other concurrent transactions modifying the queue. Poiché non può essere eseguita alcuna inferenza sull'ordine degli elementi nella coda, gli elenchi dequeue1 e dequeue2 conterranno i due elementi in qualsiasi ordine.Since no inference can be made about the order of the items in the queue, the lists dequeue1 and dequeue2 will each contain any two items, in any order.

Lo stesso elemento non sarà presente in entrambi gli elenchi.The same item will not appear in both lists. Di conseguenza, se dequeue1 contiene i valori 10, 30, dequeue2 conterrà i valori 20 e 40.Hence, if dequeue1 has 10, 30, then dequeue2 would have 20, 40.

  • Caso 3: ordine di rimozione dalla coda con interruzione della transazioneCase 3: Dequeue Ordering With Transaction Abort

L'interruzione di una transazione con rimozioni delle coda in transito riporta gli elementi all'inizio della coda.Aborting a transaction with in-flight dequeues puts the items back on the head of the queue. L'ordine in cui gli elementi vengono reinseriti all'inizio della coda non è garantito.The order in which the items are put back on the head of the queue is not guaranteed. Esaminare il codice seguente:Let us look at the following code:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

Si supponga che gli elementi siano stati rimossi dalla coda nell'ordine seguente:Assume that the items were dequeued in the following order:

10, 2010, 20

Quando si interrompe la transazione, gli elementi vengono inseriti di nuovo all'inizio della coda in uno dei seguenti ordini:When we abort the transaction, the items would be added back to the head of the queue in any of the following orders:

10, 2010, 20

20, 1020, 10

Lo stesso vale per tutti i casi in cui la transazione non è stata eseguita correttamente.The same is true for all cases where the transaction was not successfully Committed.

Modelli di programmazioneProgramming Patterns

In questa sezione verranno esaminati alcuni modelli di programmazione che possono risultare utili usando ReliableConcurrentQueue.In this section, let us look at a few programming patterns that might be helpful in using ReliableConcurrentQueue.

Rimozioni dalla coda in batchBatch Dequeues

Un modello di programmazione consigliato per l'attività di tipo consumer è inserire in batch le operazioni di rimozione dalla coda, anziché eseguire una rimozione alla volta.A recommended programming pattern is for the consumer task to batch its dequeues instead of performing one dequeue at a time. L'utente può scegliere di limitare i ritardi tra ogni batch o la dimensione del batch.The user can choose to throttle delays between every batch or the batch size. Il frammento di codice seguente illustra questo modello di programmazione.The following code snippet shows this programming model. Si noti che in questo esempio l'elaborazione viene eseguita dopo il commit della transazione e pertanto se si verifica un errore durante l'elaborazione, gli elementi non elaborati andranno persi senza essere stati elaborati.Note that in this example, the processing is done after the transaction is committed, so if a fault were to occur while processing, the unprocessed items will be lost without having been processed. In alternativa, l'elaborazione può essere eseguita nell'ambito della transazione, ma ciò potrebbe avere un impatto negativo sulle prestazioni e richiede la gestione degli elementi già elaborati.Alternatively, the processing can be done within the transaction's scope, however this may have a negative impact on performance and requires handling of the items already processed.

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

Elaborazione basata su notifica con il migliore sforzoBest-Effort Notification-Based Processing

Un altro modello di programmazione interessante usa l'API di conteggio.Another interesting programming pattern uses the Count API. In questo caso è possibile implementare l'elaborazione basata su notifica con il migliore sforzo per la coda.Here, we can implement best-effort notification-based processing for the queue. Il conteggio della coda consente di limitare un'attività di accodamento o di rimozione dalla coda.The queue Count can be used to throttle an enqueue or a dequeue task. Si noti che, come nell'esempio precedente, poiché l'elaborazione si verifica all'esterno della transazione, gli elementi non elaborati potrebbero essere persi in caso di errore durante l'elaborazione.Note that as in the previous example, since the processing occurs outside the transaction, unprocessed items may be lost if a fault occurs during processing.

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

Svuotamento in base al migliore sforzoBest-Effort Drain

Lo svuotamento della coda non può essere garantito a causa della natura simultanea della struttura di dati.A drain of the queue cannot be guaranteed due to the concurrent nature of the data structure. È possibile che, anche se non è in corso alcuna operazione dell'utente nella coda, una chiamata specifica a TryDequeueAsync possa non restituire un elemento che in precedenza era stato accodato e di cui era stato eseguito il commit.It is possible that, even if no user operations on the queue are in-flight, a particular call to TryDequeueAsync may not return an item which was previously enqueued and committed. L'elemento accodato alla fine diventerà visibile per la rimozione dalla coda; tuttavia senza un meccanismo di comunicazione fuori banda, un consumer indipendente non può sapere se la coda ha raggiunto uno stato stabile, anche se sono stati arrestati tutti i producer e non sono consentite nuove operazione di accodamento.The enqueued item is guaranteed to eventually become visible to dequeue, however without an out-of-band communication mechanism, an independent consumer cannot know that the queue has reached a steady-state even if all producers have been stopped and no new enqueue operations are allowed. Di conseguenza l'operazione di svuotamento avviene in base al migliore sforzo, come implementato di seguito.Thus, the drain operation is best-effort as implemented below.

L'utente deve arrestare tutte le successive attività di producer e consumer e attendere il commit o l'interruzione delle transazioni in transito prima di tentare di svuotare la coda.The user should stop all further producer and consumer tasks, and wait for any in-flight transactions to commit or abort, before attempting to drain the queue. Se conosce il numero previsto di elementi nella coda, l'utente può impostare una notifica che segnala che tutti gli elementi sono stati rimossi dalla coda.If the user knows the expected number of items in the queue, they can set up a notification which signals that all items have been dequeued.

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

VisualizzazionePeek

ReliableConcurrentQueue non fornisce l'API TryPeekAsync.ReliableConcurrentQueue does not provide the TryPeekAsync api. Gli utenti possono ottenere la visualizzazione semantica usando TryDequeueAsync e quindi interrompendo la transazione.Users can get the peek semantic by using a TryDequeueAsync and then aborting the transaction. In questo esempio le rimozioni dalla coda vengono elaborate solo se il valore dell'elemento è maggiore di 10.In this example, dequeues are processed only if the item's value is greater than 10.

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

Da leggereMust Read