2016 年六月

第 31 卷,第 6 期

本文章是由機器翻譯。

Reactive Framework - 使用 Reactive 縮放非同步主從連結

Peter Vogel |2016 年 6 月

由於非同步處理已變得更常見的應用程式開發,Microsoft.NET Framework 已取得各種工具,支援特定的非同步設計模式。通常建立設計完善的非同步應用程式是辨識您的應用程式是實作,然後挑選正確的.NET 元件集的設計模式。

在某些情況下,比對需要整合數個.NET 元件。Stephen Cleary 的文章中,「 非同步 MVVM 應用程式的模式 ︰ 命令 > (bit.ly/233Kocr),示範如何以非同步方式的完整支援 Model View ViewModel (MVVM) 模式。在其他情況下支援需要從.NET Framework 的一個元件。我所討論實作提供者/取用者模式中,使用在 BlockingCollection 我 VisualStudioMagazine.com 實際的.NET 資料行,「 建立簡單、 可靠與 BlockingCollection 非同步應用程式 」 (bit.ly/1TuOpE6),以及 「 建立複雜的非同步應用程式與 BlockingCollection 」 (bit.ly/1SpYyD4)。

另一個範例實作觀察者設計模式,以非同步方式監視長期執行作業。在此案例中,傳回單一的工作物件的非同步方法不會運作,因為用戶端通常會傳回結果資料流。這些情況下,您可以利用兩個以上的工具,從.NET Framework: ObservableCollection 和 Reactive Extensions (Rx)。簡單的解決方案,ObservableCollection (以及 async 和 await 關鍵字) 是一切。不過,如需 「 有趣 」 和,尤其是事件驅動的問題,接收您提供更佳控制程序。

定義的模式

觀察器模式通常用於 UI 設計模式時,包括模型檢視控制器 (MVC)、 Model View Presenter (MVP) 和 MVVM — Ui 應視為一大組案例,觀察者模式適用於從一個案例。觀察者模式 (從 Wikipedia 引用) 的定義是 ︰ [這個物件稱為主體,[,],會維護一份相依項目,稱為 「 觀察者,並通知自動的任何狀態變更時,通常是藉由呼叫其中一個方法。

的確,觀察者模式就取得的結果從長時間執行的程序,用戶端只要有可用的這些結果。某些版本的觀察者模式中,沒有用戶端必須等到使用最後的結果,然後所有結果都傳送至其在單一的金額。在非同步越來越多的世界中,您想處理以平行方式與用戶端的結果,因為結果,就可以使用觀察者。要強調您所談論個以上的 Ui 運用 「 觀察者模式中,我將使用 「 用戶端 」 和 「 伺服器 」,而不是 「 觀察者 」 和 「 主體 」,這篇文章的其餘部分。

問題與機會

有三個以上的問題和觀察者模式使用兩個機會。第一個問題是屆滿接聽程式問題 ︰ 觀察器模式的許多實作中需要保存所有其用戶端至伺服器。如此一來,直到伺服器結束時,就可以用戶端保留在記憶體中伺服器。這顯然不長時間執行的處理序的最佳解決方案,讓用戶端連線,並經常中斷動態系統中。

屆滿接聽程式問題,不過,是問題的只是問題的第二個、 較大的徵兆 ︰ 觀察器模式的許多實作需要伺服器和用戶端緊密結合,要求伺服器與用戶端會出現在所有的時間。最起碼,用戶端應該能夠判斷伺服器是否存在,並選擇不附加。此外,伺服器應該能夠在即使沒有接受結果的用戶端。

第三個問題是與效能相關 ︰ 多久才會收到通知的所有用戶端的伺服器? 在觀察器模式的效能會直接影響告知用戶端的數目。因此,不會有機會在觀察器模式的效能改善,可讓用戶端事先篩選來自伺服器的結果。這也能解決的案例更多結果 (或更廣泛的結果) 伺服器會產生比用戶端有興趣 ︰ 用戶端可以指出,它只是為了在特定情況下會收到通知。第二個效能機會存在周圍辨識伺服器沒有結果或產生的結果完成時。用戶端可以略過處理伺服器事件,直到用戶端保證有一點處理和用戶端可以釋放這些資源,因為他們知道他們所處理的最新結果所需的資源時發生。

從發佈/訂閱的觀察者

這些考量考量 redo 潛在客戶從觀察器模式的簡單實作相關的發佈/訂閱模型。發佈/訂閱會實作觀察者模式鬆散耦合的方式,可讓伺服器和用戶端中執行,即使另一個是目前無法使用。發佈/訂閱通常也會實作用戶端篩選,可讓用戶端訂閱特定主題/通道 (「 通知我有關採購單 」) 或不同類型的內容 (「 通知我有關任何緊急要求 」) 相關聯的屬性。

一個問題仍然發生,不過。觀察器模式的所有實作都傾向於緊密一些用戶端和伺服器特定的訊息格式。變更大多數發行/訂閱案例中,訊息的格式可能很困難,因為所有的用戶端必須更新為使用新的格式。

在許多方面,這是類似於資料庫中的伺服器端資料指標的描述。傳輸成本降到最低,資料庫伺服器不傳回結果,就如同會擷取每個資料列。不過,對於大型的資料列集,資料庫也不會傳回所有資料列結尾的單一批次中。相反地,資料庫伺服器通常傳回子集,從這些子集當做通常保留在伺服器上的資料指標變成可用。使用資料庫時,用戶端和伺服器不一定要同時存在 ︰ 資料庫伺服器可以執行時沒有出現,用戶端用戶端,可以檢查伺服器是否可存取,如果沒有,請在 (如果有的話) 決定什麼其他可能造成。篩選程序 (SQL) 也是非常有彈性。不過,如果資料庫引擎變更它使用傳回的資料列的格式,則所有用戶端必須,最起碼,重新編譯。

處理物件快取

為簡單的觀察器模式實作查詢的我案例研究,我使用伺服器的發票記憶體中快取中搜尋的類別。該伺服器在處理結束時,可以傳回所有發票內容的集合。不過,我想讓用戶端個別和伺服器的搜尋程序的平行處理發票。這表示我偏好的程序會傳回每張發票會找到,而且可讓用戶端處理以平行方式,使用搜尋目標] 下一步的發票每張發票的版本。

伺服器的簡單實作可能看起來像這樣 ︰

private List<Invoice> foundInvoices = new List<Invoice>();
public List<Invoice> FindInvoices(decimal Amount)
{
  foundInvoices.Clear();
  Invoice inv;
    // ...search logic to add invoices to the collection
     foundInvoices.Add(inv);
    // ...repeat until all invoices found
    return foundInvoices;
}

較複雜的解決方案可能使用 yield return 傳回找到的每張發票,而非組合清單。無論如何,呼叫 FindInvoices 方法的用戶端會想要處理的前後執行一些重要的活動。例如,一旦找到第一個項目,用戶端可以啟用 MatchingInvoices 清單保存在用戶端在發票或取得/初始化處理發票時所需的任何資源。加入額外的發票時,用戶端必須來處理每張發票,並在伺服器發出訊號,會擷取最後的發票時, 釋放處理 「 沒有 」 發票因為已不再需要的任何資源。

在資料庫擷取,比方說,讀取會阻擋,直到第一個資料列會傳回。一旦傳回第一個資料列,用戶端初始化處理的資料列所需的任何一種資源。擷取最後一個資料列時,可讓用戶端釋放這些資源,因為沒有要處理多個資料列,讀取也會傳回 false。

建立簡單的解決方案與 ObservableCollection

在.NET Framework 中實作觀察器模式的最明顯的選項是 ObservableCollection。ObservableCollection 會通知用戶端 (透過事件) 變更時。

重寫我使用 ObservableCollection 類別的範例伺服器需要只有兩個變更。首先,保存結果集合必須定義為 ObservableCollection 與公開。第二,它已不再需要方法傳回的結果 ︰ 伺服器只需要加入至集合的發票。

伺服器的新實作可能看起來像這樣 ︰

public List<Invoice> FindInvoices(decimal Amount)
{
  public ObservableCollection<Invoice> foundInvoices =
    new ObservableCollection<Invoice>();
  public void FindInvoices(decimal Amount)
  {
    foundInvoices.Clear();
    Invoice inv;
    // ...search logic to set inv
     foundInvoices.Add(inv);
    // ...repeat until all invoices are added to the collection   
  }

使用此版本的伺服器的用戶端只需要接到 InvoiceManagement foundInvoices 集合的 CollectionChanged 事件的事件處理常式。下列程式碼中,我曾經實作 IDisposable 介面,以支援中斷事件的類別 ︰

public class SearchInvoices: IDisposable
{
  InvoiceManagement invMgmt = new InvoiceManagement();
  public void SearchInvoices()
  {
    invMgmt.foundInvoices.CollectionChanged += InvoicesFound;
  }
  public void Dispose()
  {
    invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged;
  }

在用戶端,CollectionChanged 事件傳遞的 NotifyCollectionChangedEventArgs 物件做為其第二個參數。物件的 [動作] 屬性指定同時變更在集合上執行 (動作是 ︰ 集合已清除,新的項目加入至集合,因此現有項目移動/取代/移除) 和變更的項目 (任何加入項目的集合,存在於之前要加入已移動/移除/取代項目的位置的新項目集合的項目集合) 的相關資訊。

在用戶端,以加入集合中的伺服器以非同步方式處理每張發票的簡單程式碼會看起來像中的程式碼 [圖 1

[圖 1 以非同步方式處理發票使用 ObservableCollection

private async void InvoicesFound(object sender,
  NotifyCollectionChangedEventArgs e)
{
  switch (e.Action)
  {
    case NotifyCollectionChangedAction.Reset:
      {
        // ...initial item processing
        return;
      }
    case NotifyCollectionChangedAction.Add:
      {
        foreach (Invoice inv in e.NewItems)
        {
          await HandleInvoiceAsync(inv);
        }
        return;
      }
  }
}

While 簡單,這段程式碼可能不足以您的需求,特別是如果您正在處理長時間執行的程序或動態的環境中運作。例如,從非同步設計觀點來看,程式碼無法擷取傳回 HandleInvoiceAsync,讓用戶端無法管理非同步工作的工作物件。您也會想要確定即使 FindInvoices 是背景執行緒上執行,會在 UI 執行緒引發 CollectionChanged 事件。

因為其中清除方法被呼叫的伺服器類別 (之前搜尋的第一個發票) [動作] 屬性重設值可做為第一個項目即將擷取的訊號。不過,當然沒有發票中可找到搜尋,因此使用重設的動作可能會導致實際上永遠不會使用用戶端配置資源。若要實際處理 「 第一個項目 」 處理,您必須執行只找到第一個項目已加入動作處理加上旗標。

此外,伺服器具有有限的選項,指出發現最後一筆發票,讓用戶端可能會停止等候 「 下一步一 」。 伺服器無法,大半清除後尋找最後一個項目中,集合,但只,可能會強制重設動作處理處理更複雜 (有我開始處理發票嗎? 如果是,則我已經處理最後一筆發票中。如果不存在,然後我的相關程序的第一個發票)。

雖然對於簡單的問題,就會恢復正常 ObservableCollection,任何相當複雜的實作,根據 ObservableCollection (和任何值效率的應用程式) 會需要複雜的程式碼,尤其是在用戶端。

Rx 解決方案

如果您想要非同步處理則 Rx (透過 NuGet 提供) 可以提供更好的解決方案,以實作觀察器模式 client 發佈/訂閱模型。此解決方案也提供 LINQ 為基礎篩選模型,第一個/最後一個項目條件和更佳的錯誤處理的進一步通知。

Rx 也可以處理比使用 ObservableCollection 可能會有更有趣的觀察者實作。在我的案例研究之後傳回初始清單的發票,, 我的伺服器可能會繼續檢查有新的發票原始搜尋完成之後 (以及符合搜尋準則,當然) 新增至快取。發票會議時不會顯示準則,用戶端會想要將新的發票可以新增至清單,會通知事件。Rx 支援這些類型的事件架構的擴充功能優於 ObservableCollection 「 觀察者模式。

兩個索引鍵中有介面 Rx 支援觀察器模式。第一個是 IObservable < T >,由伺服器實作,並指定一個方法 ︰ 訂閱。伺服器實作 Subscribe 方法會傳遞給用戶端物件的參考。若要處理失效的接聽程式問題,Subscribe 方法會傳回給用戶端實作 IDisposable 介面的物件的參考。若要從伺服器中斷連線,用戶端可以使用該物件。當用戶端未中斷連線時,伺服器會預期從其內部清單中移除用戶端。

第二個是 < T > IObserver 介面必須由用戶端實作。該介面會要求用戶端實作和公開到伺服器的三種方法 ︰ OnNext、 OnCompleted 和 OnError。重要的方法是 OnNext 伺服器用來將訊息傳遞至用戶端 (在我的案例研究中該訊息可能會不會傳回每個出現的新發票物件)。伺服器可以使用用戶端的 OnCompleted 方法,以表示沒有詳細資料。第三個方法,OnError,可讓伺服器發出信號給用戶端發生例外狀況。

也很歡迎 IObserver 介面自行實作,當然 (的.NET Framework 的一部分)。ObservableCollection,以及可能是您只需要如果您要建立同步方案 (我所發表的資料行,也 」 撰寫簡潔的程式碼與 Reactive Extensions 」 [bit.ly/10nfQtm])。

不過,Rx 包含數個封裝,提供非同步處理實作這些介面,包括 JavaScript 和 RESTful 服務的實作。Rx 主體類別提供簡化實作觀察器模式的非同步發佈/訂閱版本的 IObservable 的實作。

建立非同步方案

建立要使用的主體物件的伺服器需要很少變更原始的同步伺服器端程式碼。我會取代舊 ObservableCollection 的主體物件,將會傳遞每張發票出現任何接聽的用戶端。我宣告為 public 的主體物件,讓用戶端可以存取它 ︰

public class InvoiceManagement
{
  public IObservable<Invoice> foundInvoice =
    new Subject<Invoice>();

本文的方法,而不是將發票加入至集合中,我會使用主體的 OnNext 方法来傳遞給用戶端的每張發票,找到 ︰

public void FindInvoices(decimal Amount)
{
  inv = GetInvoicesForAmount(Amount) // Poll for invoices
  foundInvoice.OnNext(inv);
  // ...repeat...
}

在我的用戶端,我先宣告伺服器類別的執行個體。然後,在標示為非同步方法中,我呼叫來表示我想要開始擷取訊息主體的 Subscribe 方法 ︰

public class InvoiceManagementTests
{
  InvoiceManagement invMgmt = new InvoiceManagement();
  public async void ProcessInvoices()
  {
    invMgmt.foundInvoice.Subscribe<Invoice>();

若要篩選結果,以便只是我想要的發票,我可以將 LINQ 陳述式套用到主體物件。這個範例會篩選會延遲交貨的發票 (使用 Rx LINQ 延伸模組,您必須加入 using 陳述式 System.Reactive.Linq 命名空間):

invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();

一旦我已經開始接聽主體時,我可以指定什麼我想要收到發票的處理。我比方說,可以使用 FirstAsync 來處理只服務所傳回的第一個發票。在此範例中,我使用 await 陳述式,呼叫 FirstAsync,讓我可以將控制權交還給我的應用程式在處理發票時的主要本文。要擷取的第一個 「 發票 」,則任何程式碼,我用來初始化處理程序的發票,最後,處理發票上移,等待這段程式碼 ︰

Invoice inv;
inv = await invMgmt.foundInvoice.FirstAsync();
// ...setup code invoices...
HandleInvoiceAsync(inv);

這裡要提出一個警告: 如果伺服器還沒有產生任何結果,會封鎖 FirstAsync。如果您想要避免封鎖,您可以使用 FirstOrDefaultAsync,如果伺服器未產生任何結果會傳回 null。如果不有任何結果,用戶端可以決定,如果任何項目,來執行。

更常見的案例是用戶端想處理 (之後篩選),傳回所有發票,並以非同步的方式。在此情況下,而不使用訂閱和 OnNext 的組合,您可以只使用 ForEachAsync 方法。您可以傳遞方法或 lambda 運算式以處理傳入的結果。如果您傳遞的方法 (這不是非同步的),做為我在這裡,觸發 ForEachAsync 的發票將傳遞方法 ︰

invMgmt.foundInvoice.ForEachAsync(HandleInvoice);

ForEachAsync 方法也可以傳遞,讓它中斷連線的用戶端訊號的取消語彙基元。好的作法是將權杖傳遞時呼叫任何 Rx * 非同步方法,以支援可讓用戶端終止處理,而不必等待處理的所有物件。

ForEachAsync 不會處理所有結果,所以您可以使用 FirstOrDefaultAsync ForEachAsync 檢查伺服器是否處理後續的物件之前要處理的任何項目已經處理第一個 (或 FirstOrDefaultAsync) 的方式。不過,主體的 IsEmpty 方法將會更輕鬆地執行相同的檢查。如果用戶端必須處理結果所需的任何資源配置,IsEmpty 可讓用戶端檢查來查看是否有任何項目之前配置的資源 (另外也可以將這些處理迴圈中的第一個項目上的資源配置)。檢查以查看是否有任何結果,然後再配置資源 (並開始處理) 同時也支援取消會提供類似的程式碼的用戶端使用 IsEmpty [圖 2

[圖 2 程式碼,以支援取消和延遲處理直到結果已備妥

CancellationTokenSource cancelSource = new CancellationTokenSource();
CancellationToken cancel;
cancel = cancelSource.Token;
if (!await invMgmt.foundInvoice.IsEmpty())
{
  // ...setup code for processing invoices...
  try
  {
    invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel);
  }
  catch (Exception ex)
  {
    if (ex.GetType() != typeof(CancellationToken))
    {
      // ...report message
    }
   }
   // ...clean up code when all invoices are processed or client disconnects
}

總結

如果您只需要是觀察器模式的簡單實作,則 ObservableCollection 可能會執行您所要的結果資料流處理。讓您進一步控制和以事件為基礎的應用程式中,主體類別以及隨附 Rx 的擴充功能可讓您藉由支援發佈/訂閱模型的強大實作非同步模式中運作的應用程式 (和我還沒有看過運算子 Rx 隨附的豐富的程式庫)。如果您正在使用 Rx,值得下載 Rx 程式設計指南 》 (bit.ly/1VOPxGS),其中討論取用,並產生可觀察資料流中的最佳做法。

Rx 也提供一些支援轉換使用 ISubject < TSource,Tresult> > 介面在用戶端與伺服器之間傳遞的訊息類型。ISubject < TSource,Tresult> > 介面指定兩個資料類型: 「 中 」 資料類型和"out"資料類型。實作此介面的主體類別中,您可以執行任何作業需要轉換成用戶端 (「 出 」 資料類型) 所需的結果傳回從伺服器 (「 中 」 的資料型別) 的結果。此外,in 參數是 covariant (它會接受指定的資料類型或任何項目資料型別繼承自) 和輸出參數是的 contravariant (將會接受指定的資料類型,或從它衍生的任何內容),讓您更大的彈性。

我們住在非同步越來越多的世界中與外界的觀察器模式會變得更加重要,它是有用的工具,其中伺服器處理序會傳回多個單一結果的處理序之間的任何介面。幸運的是,您有幾個選項在.NET Framework,包括 ObservableCollection 和 Rx 實作觀察器模式。


Peter Vogel是系統架構設計人員和 PH & V 資訊服務中的主體。PH (& V) 提供諮詢從透過物件模型和資料庫設計的 UX 設計的完整堆疊。

感謝以下的微軟技術專家對本文的審閱: Stephen Cleary、 James McCaffrey 和 Dave Sexton
Stephen Cleary 使用過多執行緒和非同步 16 年的程式設計和已使用 Microsoft.NET Framework 中的非同步支援自第一次的社群技術預覽。他是 「 並行存取在 C# 操作手冊 」 (O'Reilly Media,2014年) 的作者。他的首頁,包括他的部落格位於 stephencleary.com