Azure Service Fabric 中的 ReliableConcurrentQueue 簡介

可靠的並行佇列是非同步、交易式和複寫的佇列,特徵是加入佇列與清除佇列作業的高並行存取。 它旨在提供高輸送量和低延遲,方法是將可靠的佇列所提供的嚴格 FIFO 順序放寬,並改為提供最佳的順序。

API

並行佇列 可靠的並行佇列
void Enqueue(T item) Task EnqueueAsync(ITransaction tx, T item)
bool TryDequeue(out T result) Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
int Count() long Count()

可靠的佇列進行比較

可靠的並行佇列會以可靠的佇列替代形式提供。 它應用於不需要嚴格 FIFO 順序的情況,因為保證 FIFO 使用並行存取時需要有所取捨。 可靠的佇列會使用鎖定來強制使用 FIFO 順序,並且一次最多允許一個交易加入佇列,以及一次最多允許一個交易清除佇列。 相較之下,可靠的並行佇列會放寬排序條件約束,並允許任何數目的並行交易可交錯其加入佇列及清除佇列作業。 會提供最佳順序,不過,一律無法保證可靠並行佇列中兩個值的相對順序。

每當多個並行交易執行加入佇列或清除佇列時,可靠的並行佇列會提供比可靠的佇列更高的輸送量和更低的延遲。

ReliableConcurrentQueue 的範例使用情況為訊息佇列案例。 在此案例中,一個或多個訊息生產者會建立項目並加以新增至佇列,而一個或多個訊息取用者則會從佇列中提取訊息並加以處理。 多個產生者和取用者可以獨立運作,使用並行交易以處理佇列。

使用方針

  • 佇列會預期佇列中的項目具有低保留期限。 也就是項目在佇列中不會長時間停留。
  • 佇列並不保證嚴格的 FIFO 順序。
  • 佇列不會閱讀它自己的寫入。 如果項目在交易內加入佇列,則同一個交易內的清除佇列者將看不到它。
  • 清除佇列不會彼此互相隔離。 如果已在交易 txnA 中將項目 A 清除佇列,則即使 txnA 不認可,並行交易 txnB 也看不到項目 A。 如果 txnA 中止,txnB 就可立即看到 A
  • 可將 TryPeekAsync 行為加以實作,方法是使用 TryDequeueAsync,然後中止交易。 程式設計模式一節中可以找到此一行為的範例。
  • 計數為非交易式。 它可用來了解佇列中元素的數目,但會以時間點表示且無法依賴。
  • 交易使用中時,不應在清除佇列的項目上執行昂貴的處理,以避免長時間執行交易可能會對系統造成的效能影響。

程式碼片段

讓我們看看幾個程式碼片段,和其預期的輸出。 本節中會忽略例外狀況處理。

具現化

建立可靠並行佇列的執行個體就類似於任何其他可靠的集合。

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

以下是使用 EnqueueAsync 的幾個程式碼片段,隨後是其預期的輸出。

  • 案例 1︰單一加入佇列工作
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

假設已順利完成工作,且沒有修改佇列的並行交易。 使用者可以預期佇列會以下列任何一個順序來包含項目︰

10, 20

20, 10

  • 案例 2︰平行加入佇列工作
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

假設已順利完成工作、工作以平行方式執行,且沒有其他修改佇列的並行交易。 無法推斷佇列中的項目順序。 此程式碼片段中,項目會以下列 4 個之一出現! 可能的排序。 佇列會嘗試以原始順序保留項目 (佇列),但可能會因並行作業或錯誤而強制將它們重新排序。

DequeueAsync

以下是使用 TryDequeueAsync 的幾個程式碼片段,隨後是預期的輸出。 假設已將佇列中的下列項目填入佇列︰

10, 20, 30, 40, 50, 60

  • 案例 1︰單一清除佇列工作
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

假設已順利完成工作,且沒有修改佇列的並行交易。 由於無法推斷佇列中的項目順序,任何三個項目都可能會以任何順序加以清除佇列。 佇列會嘗試以原始順序保留項目 (佇列),但可能會因並行作業或錯誤而強制將它們重新排序。

  • 案例 2︰平行清除佇列工作
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

假設已順利完成工作、工作以平行方式執行,且沒有其他修改佇列的並行交易。 由於無法推斷佇列中的項目順序,dequeue1 和 dequeue2 都會以任何順序包含任何兩個項目。

相同項目不會同時出現在兩份清單中。 因此,如果 dequeue1 包含 1030,dequeue2 就會包含 2040

  • 案例 3︰使用交易中止來清除佇列排序

使用執行中的清除佇列將交易中止,會將項目放回佇列的前端。 無法保證將項目放回佇列前端的順序。 我們來看看下面的程式碼:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

假設已依下列順序將項目清除佇列︰

10, 20

當我們將交易中止時,會依下列任何一個順序將項目新增回佇列的前端︰

10, 20

20, 10

這也適用於交易未成功認可的所有情況。

程式設計模式

在本節中,我們來看看幾個可能有助於使用 ReliableConcurrentQueue 的程式設計模式。

批次清除佇列

建議的程式設計模式是使取用者工作以批次方式清除佇列,而不是一次執行一個清除佇列。 使用者可以選擇節流處理每個批次或批次大小之間的延遲。 下列程式碼片段會示範此程式設計模型。 請注意,在此範例中,交易認可後即完成處理,因此如果在處理時發生錯誤,未處理的項目就會遺失而未經處理。 或者,可以在交易的範圍內完成處理,不過,這可能會對效能造成負面影響,而且需要處理已經處理的項目。

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

以通知作為基礎的最佳處理

另一個有趣的程式設計模式是使用計數 API。 在這裡,我們可以針對佇列實作以通知作為基礎的最佳處理。 佇列計數可以用來將加入佇列或清除佇列工作進行節流處理。 請注意,如同先前的範例,因為處理發生於交易外部,如果在處理期間發生錯誤,未處理的項目可能就會遺失。

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

盡可能清空

由於資料結構的並行本質,無法保證可將佇列清空。 即使在佇列上沒有進行中的使用者作業,特定的 TryDequeueAsync 呼叫可能不會傳回先前已加入佇列並且受到認可的項目。 清除佇列最終保證可看到加入佇列的項目,不過,沒有超出訊號範圍通訊機制的獨立取用者,無法得知佇列已觸達穩定狀態,即使已將所有的產生者停止,且不允許任何新的加入佇列作業亦然。 因此,已盡可能清空作業,實作如下。

使用者應該將所有進一步的生產者和取用者工作停止,並在嘗試清空佇列之前,等待任何進行中的交易加以認可或中止。 如果使用者知道佇列中預期的項目數目,就可以設定通知,發出所有項目皆已清除佇列的訊號。

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

預覽

ReliableConcurrentQueue 不會提供 TryPeekAsync API。 使用者可以預覽語意,方法是使用 TryDequeueAsync 然後將交易中止。 在此範例中,僅在項目大於 10 時,才會處理清除佇列。

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

必須閱讀