本文章是由機器翻譯。

非同步代理程式

使用非同步代理程式庫進行以動作項目為基礎的程式設計

Michael Chu

隨著多核處理器在市場上的日益普及,它已廣泛用於伺服器、桌上型電腦以及可擕式電腦,代碼並行化的重要性也前所未有地凸顯出來。為了滿足這一關鍵需求,Visual Studio 2010 引入了若干新的方法,説明 C++ 開發人員利用新的並行運行時和新的並行程式設計模型帶來的這些功能。然而,開發人員面臨的一個主要障礙是確定哪種程式設計模型適合於他們的應用程式。正確的模型可以充分利用底層並行性,不過也需要重新考慮程式結構和實際的執行方式。

目前,最常見的並行程式設計模型涉及到通用的併發感知容器以及並行迴圈反覆運算等演算法。雖然這些傳統的技術可以是功能強大的方法縮放的應用程式可以利用多核心機器,它們 don’t 解決其中一個其他會影響平行效能的主要因素:延遲日益增加的影響。由於並行技術加快計算速度並將計算分佈在多個內核之間,因此,Amdahl 定律 (wikipedia.org/wiki/Amdahl's_law) 告訴我們性能改進受到執行速度最慢的那一部分制約。在許多情況下,等待來自 I/O(例如磁片或網路)的資料所花的時間比例越來越大。

基於角色的程式設計模型能夠很好地處理延遲等問題,這些模型最初是在二十世紀七十年代初引入的,目的是利用具有成百上千個獨立處理器的高度平行電腦資源。角色模型背後的基本概念是將應用程式的各個元件視為單獨的角色,這些角色可以通過發送、接收和處理消息與外界交互。

最近,隨著大量多核處理器的運用,角色模型已作為一種減少延遲、實現高效並存執行的有效方法重新露面。Visual Studio 2010 引入了非同步代理庫 (AAL),這是一個令人激動的基於角色的新模型,它具有消息傳遞介面,在該模型中代理就是角色。AAL 使開發人員可以通過更加以資料流程為中心的方式設計自己的應用程式。這樣的設計通常有利於在等待資料時有效使用延遲。

在本文中,我們將概述 AAL 並介紹如何在應用程式中使用它。

併發運行時

Visual Studio 2010 和 AAL 中併發支援的基礎是新的併發運行時,該運行時作為 Visual Studio 2010 中 C 運行時 (CRT) 的一部分提供。併發運行時提供協調任務計畫程式和資源管理器,後者對電腦的底層資源有深入瞭解。這就允許運行時以負載平衡的方式在整個多核計算機中執行任務。

图 1 簡要地展示了 Visual Studio 2010 中對本機代碼併發的支援。計畫程式是確定何時何地執行任務的主要元件。它借助資源管理器收集的資訊來充分地利用執行資源。儘管應用程式和庫也可以直接與運行時交互,但它們本身主要還是通過兩個位于計畫程式之上的程式設計模型(即 AAL 和並行模式庫 (PPL))與併發運行時交互。

图 1 并发运行时

PPL 提供更為傳統的並行技術(例如 parallel_for 和 parallel_for_each constructs)、可識別運行時的鎖和併發資料結構(例如佇列和向量)。雖然 PPL 不是本文介紹的重點,但它也是一種功能強大的工具,開發人員可以將其與 AAL 中引入的所有新方法配合使用。有關 PPL 的詳細資訊,請參閱 2009 年 2 月刊載的《使用 C++ 的 Windows》專欄 (msdn.microsoft.com/magazine/dd434652)。

相比之下,AAL 能夠在更高級別以不同于傳統技術的角度來並行化應用程式。開發人員需要從待處理資料的角度思考應用程式,並思考如何將資料處理分隔到可並存執行的元件或階段中。

AAL 提供兩個主要元件:訊息傳遞架構和非同步的代理程式。

消息傳遞框架包括一組消息塊,用於接收、處理和傳播消息。通過將消息塊串連起來,可創建能夠同時執行的工作管道。

非同步代理是通過接收消息、在自己維護的狀態下執行本地工作和發送消息,以此與外界交互的角色。

這兩個元件結合在一起,使開發人員能夠在資料流程而不是控制流方面利用並行性,並通過更高效地使用並行資源來改善對延遲的容忍度。

消息傳遞框架

AAL 的第一個重要元件是消息傳遞框架,該框架是協助開發資料流程網路以便將工作管道化的一組構造。將工作管道化是資料流程模型的基本部分,因為它允許將工作分解為多個獨立的階段,只要資料就緒便可對流資料進行並行處理。當一個階段的資料處理結束時,該階段可將資料傳遞到下一階段,同時第一個階段尋找要處理的新資料。

我們以設置傳出消息格式並審查消息中是否存在不當內容的電子郵件應用程式為例。這種類型操作的代碼顯示如下:

std::foreach(reader.begin(); reader.end(); 
  [](const string& word) { 
    auto w1 = censor(word); 
    auto w2 = format(w1); 
    writer.write_word(w2);
  });

對於電子郵件中的每個詞,該應用程式都需要檢查它是否存在於審查詞的字典中,如果存在則予以替換。然後,代碼根據一組指導原則設置每個詞的格式。

這種方案中存在大量固有的並行性。但是,傳統並行技術還不能滿足要求。例如,一種簡單的方法是對文本中的字串使用 parallel_for_each 演算法,審查這些字串並設置格式。

這種解決方案的第一個主要阻礙是必須讀取整個檔,以便反覆運算器能夠正確地劃分工作。
強制讀取整個檔會導致進程受到 I/O 的限制,並且會降低並行效率。當然,您可以使用智慧反覆運算器將詞的處理與讀取輸入的操作重疊進行。

傳統並行方法的第二個主要問題是排序。顯然,對於電子郵件來說,對文本的並行處理必須保持文本順序,否則會完全無法理解郵件的含義。為了保持文本順序,parallel_for_each 技術會產生同步和緩衝方面的大量開銷,而這一過程可由 AAL 自動處理。

通過採用管道技術處理郵件,您可以避免上述兩個問題,同時還能利用並行能力。請看一下圖 2,其中創建了一個簡單管道。在此示例中,應用程式的主要任務(審查和設置格式)被分為兩個階段。第一個階段接收字串並在審查詞的字典中查找該字串。如果找到匹配項,審查塊會使用字典中的另一個詞替換該字串。否則,它會輸出已輸入的同一封郵件。同樣,在第二個階段中,格式設置塊接收每個詞並將其恰當地設置為特定樣式。

圖 2 電子郵件處理管道

此示例可在以下幾個方面從資料流程方法獲益。首先,由於它不需要在處理前讀取整封郵件,郵件中的字串可以通過審查和設置格式階段立即開始流處理。其次,管道處理允許一個字串由設置格式塊進行處理,同時下一個字串由審查塊進行處理。最後,由於字串的處理順序是它們在原文中出現的順序,因此不需要執行額外的同步。

消息塊

消息塊接收、處理、存儲和傳播消息。訊息區塊有三個格式之一:來源、 目標和 propagators。源只能傳播消息,而目標能夠接收、存儲和處理消息。大多數塊都是傳播器,既是源又是目標。換句話說,它們能夠接收、存儲和處理消息,也可以轉而將這些消息發送出去。

AAL 包含一組消息塊基元,能夠滿足開發人員的大部分使用需求。图 3 簡要概述了 AAL 中包括的所有消息塊。不過該模型仍然是開放式的,因此,如果您的應用程式需要具有特定行為的消息塊,可以自己編寫可與所有預定義塊交互的自訂塊。每個塊都有各自處理、存儲和傳播消息的獨有特徵。

图 3 AAL 消息块

消息塊 用途
unbounded_buffer<Type> 存儲不限數量的消息並將其傳播到目標。
overwrite_buffer<Type> 存儲一條消息,每次有新消息傳播進來時都會覆蓋該消息,然後將其廣播到目標。
single_assignment<Type> 存儲一條一次寫入的消息,然後將其廣播到目標。
transformer<Input,Output> 接收一條類型為 Input 的消息,然後運行使用者提供的函數將其轉換為類型為 Output 的消息。將這條轉換後的消息傳播到目標。
call<Type> 接收一條消息,然後使用該消息的負載作為參數來運行使用者提供的函數。這種塊是純粹的消息目標。
timer<Type> 在使用者定義的時間量之後將消息傳播到目標。可以是重複或非重複的塊。這種塊是純粹的訊息源。
choice<Type1,Type2,...> 接收來自多種類型的多個源的消息,但只接受來自傳播到所選類型的第一個塊的消息。
join<Type> 接收來自多個源的消息,將它們組合起來輸出單條消息。非同步等待從各個源輸入的消息準備就緒。
multitype_join<Type1,Type2,...> 接收來自多種類型的多個源的消息,將它們組合起來。非同步等待從各個源輸入的消息準備就緒。

AAL 提供的消息塊基元的一個主要優勢是它們的可組合性。因此,您可以根據所需行為進行組合。例如,您可以輕鬆創建將多個輸入添加到一起的塊,方法是將轉換器塊附加到聯接塊的末尾。當聯接塊成功檢索到來自它的各個源的消息時,可將消息傳遞給轉換器,而轉換器將匯總消息負載。

您也可以將重複的計時器塊連接為聯接塊的源。這會形成一個限制消息的塊,只在計時器塊觸發其消息時允許消息通過。图 4 中說明了這兩種可組合塊。

圖 4 組合來自基元的加法器塊與消息限制塊

創建消息傳遞管道

現在,我們來看看創建上文所示的消息塊管道的代碼。我們可以用兩個轉換器消息塊替換此管道,如圖 5 所示。轉換器塊的用途是接收特定類型的消息並對消息執行使用者定義的函數,這一操作可修改消息負載甚至徹底更改消息類型。例如,審查塊將包含字串的消息作為輸入接收,然後需要對其進行處理。

图 5 消息块管道

图 6 中顯示了創建和連接消息塊的代碼。此代碼從產生實體兩個轉換器消息塊開始。審查塊構造函數中的 C++0x lambda 參數定義轉換函數,該轉換函數在字典內查找消息的存儲輸入字串,看看是否應更改為其他字串。系統返回結果字串,然後在審查塊內將其封裝成單條消息並從該塊傳播出去。除非轉換器塊的輸出是格式設置函數更改過的字串,否則對於格式設置轉換器塊會採用類似途徑。

图 6 简单消息管道

dictionary dict;

transformer<string, string> 
  censor([&dict](const string& s) -> string {

  string result = s;
  auto iter = dict.find(s);

  if (iter != dict.end()) {
    result =  iter->second;
  }

  return result;
});

transformer<string, string> 
  format([](const string& s) -> string {

  string result = s;
  for (string::size_type i = 0; i < s.size(); i++) {
    result[i] = (char)Format(s[i]);
  }

  return result;
});

censor.link_target(&format);

asend(&censor, "foo");
string newStr = receive(format);
printf("%s\n", newStr);

兩個塊產生實體以後,下一行代碼通過對審查塊調用 link_target 方法,將兩個塊連結到一起。每個源塊和傳播器塊都有 link_target 方法,用於確定源應該將它的消息傳播到哪些消息塊。

審查塊和格式設置塊連結到一起後,轉換函數會處理傳播到審查塊的任何消息,生成的消息將隱式傳遞到格式設置塊進行處理。如果消息塊是沒有連接目標的源或傳播器,它可以按特定于塊的方式存儲消息,直到連結了目標或消息被檢索。

示例代碼的最後三行顯示將消息初始化到塊中以及從塊中檢索消息的過程。有兩個訊息初始化 [AAL 的 API:傳送和 asend。它們分別將消息同步或非同步輸入塊中。

主要區別是,當 send 調用返回時,保證已將消息推送到塊,並且已通過塊將消息發送到所需目標。asend 調用可以立即返回,並且允許併發運行時計畫傳播。同樣地,有兩個訊息擷取 [AAL 的 API:接收和 try_receive。receive 方法在消息到達前始終處於阻止狀態,而 try_receive 則會在無法檢索消息時立即返回。

圖 6 所示,字串“foo”會非同步發送到審查塊。審查塊將接收該消息,檢查其字串是否存在於審查詞的字典中,然後將結果字串傳播到消息中。接著,結果字串被傳遞到格式設置塊,後者接收該字串,將每個字母變成大寫,然後由於沒有目標而保留到消息中。當調用 receive 時,將從格式設置塊中獲取該消息。因此,假設 「 foo 」 已不在字典中,此範例的輸出會是 「 FOO 」。雖然此範例只將推入單一字串,透過網路,您可以看到的輸入字串的資料流形成執行的管線的方式。

請看一下此消息示例,注意消息本身明顯缺少引用。消息只是一個信封,其中封裝要在資料流程網路中傳遞的資料。消息傳遞本身是通過提供和接受過程來處理的。當消息塊收到消息時,能夠以任何想要的方式存儲該消息。如果稍後要將消息發送出去,它會將該消息提供給每個連接的目標。若要真正將消息送出,接收方必須接受提供的消息,以完成該事務。消息在塊間傳遞的整個過程是由併發運行時計畫和執行的任務來計畫和處理的。

消息塊傳播

現在,您已瞭解消息塊是如何創建和關聯在一起的,以及如何將消息初始化到每個塊中並從中檢索消息。接下來讓我們簡單瞭解一下消息如何在塊間傳遞,以及併發運行時如何成為 AAL 的核心。

使用消息塊或 AAL 不一定需要瞭解此資訊,但它有助於加深對消息傳遞協定工作方式及其使用方式的理解。在本節的其餘部分,我將介紹傳播器塊,因為它們既是源又是目標。顯然,純粹的源塊或純粹的目標塊只是傳播器塊實現的子集。

在內部,每個傳播器塊都有一個消息輸入佇列和另一個特定于塊的消息存儲容器。連結到此傳播器塊的其他塊會發送存儲在輸入佇列中的消息。

例如,在圖 7 中,審查轉換器塊有一個輸入佇列,該佇列當前存儲包含字串 str6 的消息。實際的轉換程式包含兩個訊息:str4 和 str5。因為這是轉換器,所以它的特定于塊的存儲是另一佇列。不同的塊類型可以有不同的存儲容器。例如,overwrite_buffer 塊只存儲始終會被覆蓋的單條消息。

图 7 消息传递协议

從某個連結的源(或 send/asend API)向塊提供消息時,此塊首先會檢查篩選器函數,以決定是否接受消息。如果決定接受消息,則將消息放入輸入佇列。篩選器是一個可選函數,可傳遞到返回布林值的每個目標的構造函數或傳播器塊中,該布林值決定是否應接受某個源提供的消息。如果消息被拒絕,該源會繼續向下一個目標提供消息。

一旦消息放入輸入佇列,它的源塊就不再保留此消息。不過,接受塊尚未準備好傳播消息。因此在等待處理時,消息可以緩衝到輸入佇列中。

當消息到達某個消息塊的輸入佇列時,併發運行時計畫程式會計畫一個輕型任務 (LWT)。此 LWT 有雙重目的。首先,它必須將消息從輸入佇列移到塊的內部存儲中(我們稱之為消息處理)。其次,它還必須嘗試將消息傳播到任意目標(我們稱之為消息傳播)。

例如,在圖 7 中,輸入佇列中存在提示系統計畫 LWT 的消息。接下來 LWT 會處理消息,方法是先對消息執行使用者提供的轉換器函數,在審查字串字典中檢查該消息,然後將它移到塊的存儲緩衝區。

將消息轉移到存儲緩衝區之後,LWT 開始執行傳播步驟,將消息發送到目標設置格式塊。在這種情況下,由於消息 str4 位於轉換器的前端,它會先傳播到格式設置塊,然後再傳播下一條消息 str5。同樣的整個過程會在格式設置塊中發生。

根據消息塊的類型,消息處理方式會有所不同。例如,unbounded_buffer 只有將消息移到存儲緩衝區的簡單處理步驟。轉換器處理消息的方式是先對消息調用使用者定義的函數,然後再將其移到存儲緩衝區。其他塊的處理方式甚至更複雜,例如聯接,它必須組合來自不同源的多條消息,然後將它們存儲到緩衝區中以備傳播。

就性能效率而言,AAL 在創建 LWT 方面是智慧化的,因此每次只會為每個消息塊計畫一個 LWT。如果處理 LWT 處於活動狀態時有更多消息到達輸入佇列,LWT 會繼續選取並處理這些消息。因此,如圖 7 所示,如果消息 str7 進入輸入佇列時轉換器的 LWT 仍在處理,它將選取並處理此消息,而不是啟動新的處理和傳播任務。

每個消息塊都有各自用於控制處理和傳播的 LWT,這是此設計的核心,它允許消息傳遞框架按資料流程的方式將工作管道化。因為每個消息塊在自己的 LWT 中處理和傳播消息,所以 AAL 可以將塊彼此分離,並允許跨多個塊執行並行工作。每個 LWT 必須只將自己的消息傳播到目標塊的輸入佇列,而每個目標僅計畫一個 LWT 來處理自己的輸入。使用單個 LWT 處理和傳播消息可確保為消息塊保持消息次序。

非同步代理

AAL 的第二個主要元件是非同步代理。非同步代理是粗細微性應用程式元件,專門用於非同步處理較大型的計算任務和 I/O。代理應該可以與其他代理通信,並啟動較低級別的並行處理。這些代理是隔離的,因為它們對於外界的理解完全包含在類中,它們可以通過消息傳遞與其他應用程式元件通信。代理本身被計畫為併發運行時內部的任務。這允許它們配合同時執行的其他工作來阻止和運行。

非同步代理有固定生命週期,如圖 8 所示。可以監視和等待此生命週期。綠色狀態表示運行狀態,而紅色狀態表示終止狀態。開發人員可通過從代理基類派生的方式創建自己的代理。

圖 8 非同步代理生命週期

三種基類函數(start、cancel 和 done)可轉換代理的不同狀態。一旦完成構造,代理即處於已創建狀態。啟動代理和啟動執行緒類似。除非對代理調用 start 方法,否則代理不會執行任何操作。此時,代理將根據計畫執行,並進入可運行狀態。

當併發運行時選取此代理時,它會進入已啟動狀態並繼續運行,直到使用者調用 done 方法,該方法指示它的工作已經完成。已計畫但尚未啟動代理時,調用 cancel 會將代理轉換成已取消狀態,代理將不再執行。

讓我們回顧一下電子郵件篩選示例。在此示例中,管道式消息塊將資料流程引入應用程式,並提高自己並行處理詞語的能力。但是,此示例沒有顯示如何控制處理電子郵件本身的 I/O,以及如何將它們分解成字串流,以便管道進行處理。此外,一旦字串通過管道,必須進行收集,以便以新的已審查和已設置格式的狀態重新編寫文本。這就是代理可以發揮作用的地方,目的是説明容忍 I/O 延遲差異。

例如,請看一下電子郵件管道的末尾。此時,字串正由格式設置塊輸出,並需要寫入郵箱的檔中。图 9 顯示輸出代理如何捕獲字串和創建輸出電子郵件。WriterAgent 的 run 函數接收來自迴圈中的格式設置塊的消息。

圖 9 代理捕獲格式設置塊的輸出

此應用程式中的大部分處理工作是使用資料流程完成的,而 WriterAgent 則顯示了如何在程式中引入某些控制流。例如,當檔結尾消息到達時,根據接收的輸入字串,WriterAgent 必須有不同的行為;它必須知道停止操作。圖 10 中顯示了 WriterAgent 的代碼。

圖 10 WriterAgent

class WriterAgent : public agent {
public:
  WriterAgent(ISource<string> * src) : m_source(src) {
  }

  ~WriterAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;
    fopen_s( &stream, ...
);

    string s;
    string eof("EOF");

    while (!feof(stream) && ((s=receive(m_source)) != eof)) {
      write_string(stream, s);
    }

    fclose(stream);
    done();
  }

private:

  ISource<string> * m_source;
};

此代碼有幾個值得關注的地方。首先是析構函數中對靜態函數 agent::wait 的調用。這個函式可以呼叫任何代理程式的指標,並會封鎖,直到代理程式進入終端機狀態之一:完成或取消。雖然並不是所有代理都需要在析構函數中調用 wait,但多數情況下應讓它完成,這樣可確保析構時代理不再執行任何代碼。

其次,此代碼的有趣部分是 run 方法本身。此方法定義代理的主執行過程。在此代碼中,代理正在處理從源(在本例中是格式設置塊)讀取的字串的寫出操作。

最後,請注意 run 方法的最後一行,此行是對代理函數 done 的調用。對 done 方法的調用可將代理從運行狀態轉變成完成狀態。在大多數情況下,需在 run 方法末尾調用此方法。不過,在某些情況下,應用程式可能希望使用代理來設置狀態。例如在資料流程網路中,該網路在 run 方法生存期後仍應保持活動狀態。

將所有內容整合在一起

現在,我們已經創建了消息傳遞管道對字串進行篩選和設置格式,創建了輸出代理對字串進行處理,我們可以將具有相似行為的輸入代理附加到輸出代理。圖 11 舉例說明了此應用程式如何組合到一起。

圖 11 用於處理電子郵件的代理

代理處理的一個優勢是能夠在應用程式中使用非同步角色。這樣,當資料到達並等待處理時,輸入代理將非同步開始通過管道發送字串,輸出代理同樣可以讀取和輸出檔。這些角色可以完全獨立地開始和停止處理,並且完全由資料驅動。此類行為在許多情況下非常有用,特別是在延遲驅動和非同步 I/O 情況下,例如電子郵件處理示例。

在此示例中,我添加了另一個代理 ReaderAgent,它與 WriterAgent 工作方式類似,不同的是,它處理 I/O 以讀取電子郵件並向網路發送字串。圖 12 中顯示了 ReaderAgent 的代碼。

圖 12 ReaderAgent

class ReaderAgent : public agent {
public:
  ReaderAgent(ITarget<string> * target) : m_target(target) {
  }

  ~ReaderAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;       
    fopen_s( &stream, ...);

    while (!feof(stream)) {
      asend(m_target, read_word(stream));
    }

    fclose( stream );

    asend(m_target, string("eof"));
    done();
  }

private:

  ITarget<string> * m_target;
};

現在,我們已經有 ReaderAgent 和 WriterAgent 對程式 I/O 進行非同步處理,只需將它們連結至網路中的轉換器塊,便可開始處理。將兩個塊連結在一起之後就可輕鬆完成此任務:

censor.link_target(&format);

ReaderAgent r(&censor);
r.start();

WriterAgent w(&format);
w.start();

ReaderAgent 是通過對審查的引用創建的,因此可以正確地將消息發送到該代理,而 WriterAgent 是通過對格式設置的引用創建的,因此可以檢索消息。每個代理都使用啟動 API 進行啟動,該 API 安排代理在併發運行時中執行。每個代理都在自己的析構函數中調用 agent::wait(this),因此要等到兩個代理都到達完成狀態才會開始執行。

同步

本文旨在讓讀者初步瞭解內置於 Visual Studio 2010 中的基於角色的程式設計和資料流程管道的一些新功能。我們鼓勵您試用一下。

如果您想要以挖掘更深一層有許多其他 weren’t 能夠涵蓋在本文章中,我們的功能:自訂訊息區塊建立,篩選訊息,以及其他等等。MSDN 上的平行計算開發中心 (msdn.microsoft.com/concurrency) 包含更多有關這一令人興奮的新程式設計模型如何説明您以全新方式並行化程式的詳細資訊和使用步驟。

Michael Chu 是 Microsoft 平行計算平臺部門的軟體發展工程師。.他在併發運行時團隊工作。

Krishnan Varadarajan 是 Microsoft 平行計算平臺部門的軟體發展工程師。 Created on: 10/7/2010, 12:13 Created by: MT.他在併發運行時團隊工作。

*衷心感謝以下技術專家對本文的審閱:*併發運行時團隊