本文章是由機器翻譯。

Windows Azure 內行人

Windows Azure 服務匯流排:使用工作階段的傳訊模式

Bruno Terkaly
Ricardo Villalobos

下載代碼示例

Bruno Terkaly Ricardo Villalobos
在我們以前的文章之一,我們討論了以解耦的解決方案,促進不易積垢軟體體系結構的烏雲中使用消息處理模式的重要性。(見"比較 Windows Azure 的佇列和服務匯流排佇列"在 msdn.microsoft.com/magazine/jj159884.)佇列是其中一種消息傳遞模式,和 Windows Azure 平臺提供了兩個主要選項,要實現這種方法:佇列存儲服務和服務匯流排佇列,這兩種涵蓋多個消費者競相接收和處理佇列中的郵件的每個方案。這是典型的模型在雲中,那裡可以動態添加或移除接收機支援可變的工作負載基於佇列,提供後端負載平衡/容錯移轉機制的大小 (見圖 1)。

Queuing Messaging Pattern: Each Message Is Consumed by a Single Receiver
圖 1 佇列的消息處理模式:每個消息被消耗的單個接收器

即使該佇列的消息傳遞模式是簡單解耦的解決方案,有的情況在每個接收方需要的消息,其自身副本,與丟棄一些基於特定規則的郵件的選項。這種類型的方案中的一個好例子所示圖 2,這說明了一個共同的挑戰零售企業面貌,將資訊發送到多個分支,如最新的產品目錄或更新的價格清單時。

Publisher/Subscriber Messaging Pattern: Each Message Can Be Consumed More Than Once
圖 2 發佈伺服器/訂閱消息傳遞模式:每個消息可以不止一次消耗

對於這些情況下,發佈伺服器/訂閱伺服器模式是更適合,接收器只是對表示一個或多個郵件類別,感興趣的連接到獨立的訂閱包含消息流的副本。Windows Azure 服務匯流排實現的發佈伺服器/訂閱消息傳遞模式通過主題和訂閱,大大增強了控制訊息的分佈方式的能力,基於獨立規則和篩選器。在本文中,我們將解釋如何應用這些 Windows Azure 服務匯流排功能,使用簡單的真實生活場景,假設以下要求:

  1. 產品應收到訂單,基於目錄頁中。
  2. 某些商店不要攜帶特定目錄的類別,並在這些類別中的產品應當為每個存儲篩選出。
  3. 新目錄資訊不應該應用於存儲系統中,直到到達的所有郵件。

這篇文章的所有代碼示例都創建的 Visual Studio 2012,使用 C# 作為程式設計語言。您還需要 Windows Azure SDK 版本 1.8 為.NET 開發人員和訪問 Windows Azure 的訂閱。

為專案設置了消息的藍圖

之前編寫任何代碼,您需要定義將成為郵件工作流的一部分的不同實體 (主題和訂閱)。這可以通過訪問在 Windows Azure 門戶 manage.windowsazure.com。使用您的憑據登錄,請按照下列步驟:

  1. 按一下左下角的監管中心上的創建新的圖示。
  2. 按一下上的 APP 服務圖示,然後在服務匯流排主題上和最後上創建自訂 (見圖 3)。
  3. 在第一次對話方塊的螢幕上,輸入主題名稱,並選擇適當地區和 Windows Azure 訂閱 id。如果這是您第一次選定區域中的命名空間,該嚮導會建議命名空間的佇列:[您的機構名稱]-ns。您可以更改此值。
  4. 按一下下一個標記 (右箭頭) 插入其餘的屬性。您可以保留預設值。按一下核取記號來創建主題。
  5. 按一下左側的巡覽列以獲取命名空間的清單中的服務匯流排圖示。請注意您可能看不到立即列出的命名空間。它需要幾秒鐘才能創建命名空間和更新門戶介面。
  6. 選擇的主題,您剛剛創建的清單,並按一下便捷鍵,可以在螢幕的底部找到。記錄供以後使用的完整的連接字串。
  7. 在 Windows Azure 門戶螢幕頂部,按一下訂閱,然後就創建新訂閱。在彈出對話方塊中,輸入一個名稱 (在我們的示例中,我們使用"Store1Sub"),按一下上的箭頭繼續。
  8. 在下一個螢幕中,保留預設值,但請確保檢查啟用會話選項。按一下核取記號來創建訂閱。會議將由訂閱伺服器用於檢索中按順序的消息。
  9. 三家店的每個重複步驟 7 和 8。

Creating a New Service Bus Topic Using the Windows Azure Portal
圖 3 創建一個新的服務匯流排主題,使用 Windows Azure 門戶

一旦創建了主題和訂閱,也可以在 Visual Studio 中直接存取它們。要這樣做,請打開伺服器資源管理器 (視圖 |伺服器資源管理器中),展開 Windows Azure 服務匯流排節點 (見圖 4)。在 Windows Azure 服務匯流排節點上按右鍵,然後選擇添加新的連接。輸入 Namespace 名稱、 頒發者名稱 (通常"擁有者") 和發行人便捷鍵時,Windows Azure 的命名空間在入口網站中創建記錄。

Creating a Service Bus Topic and Subscriptions Using the Visual Studio Tools
圖 4 創建一個服務巴士主題和使用 Visual Studio 工具訂閱

請記住,它是可能要以程式設計方式創建和管理這些實體使用 Microsoft.ServiceBus.Messaging 命名空間,包括 TopicClient 和 SubscriptionClient,用於在本文稍後部分中的類。

一旦創建了該消息的工作流的基本結構,我們將類比使用 Visual Studio 中創建的兩個主控台應用程式,如中所示的交通圖 5。第一個主控台應用程式,MSDNSender,將發送產品目錄。第二,MSDN­接收器,將收到的資訊,在每個商店。我們要好好分析一下以下各節中的代碼。在 Pub/Sub 模式中,MSDNSender 是發佈伺服器和 MSDNReceiver 是訂閱伺服器。

Visual Studio Solution to Simulate the Products Catalog Scenario
圖 5 Visual Studio 解決方案來類比產品目錄方案

從總部發送產品目錄

正如您看到的圖 2,總部 (發佈伺服器) 將消息發送到某個主題。這種邏輯是由在主檔案中,program.cs,然後從,MSDNSender 專案的一部分的代碼表示的。Program.cs,然後從封裝的邏輯和代碼將主題作為單個郵件發送的產品清單。讓我們看看不同的部分,從開始的 Main 方法中。請注意,我們首先創建為本專題的用戶端,如下所示:

// Create a topicClient using the
// Service Bus credentials
TopicClient topicClient =
  TopicClient.CreateFromConnectionString(
  serviceBusConnectionString, topicName);

一旦創建了 topicClient,發佈伺服器可以發送郵件使用它。 產品要發送清單存儲在 XML 檔中調用 ProductsCatalog.xml,其中包含將轉化為一個物件陣列的 10 產品實體的清單。 然後將獲取產品映射到存儲在 Product.cs 檔中的目錄和產品類:

// Deserialize XML file with Products, and store them in an object array
Catalog catalog = null;
string path = "ProductsCatalog.xml";
XmlSerializer serializer = new XmlSerializer(typeof(Catalog));
StreamReader reader = new StreamReader(path);
catalog = (Catalog) serializer.Deserialize(reader);
reader.Close();

目錄陣列中的每個產品顯示中顯示的結構圖 6

圖 6 類表示形式在目錄中的產品

public class Product
  {
    [System.Xml.Serialization.XmlElement("ProductId")]
    public string ProductId { get; set; }
    [System.Xml.Serialization.XmlElement("ProductName")]
    public string ProductName { get; set; }
    [System.Xml.Serialization.XmlElement("Category")]
    public string Category { get; set; }
    [System.Xml.Serialization.XmlElement("CatalogPage")]
    public int CatalogPage { get; set; }
    [System.Xml.Serialization.XmlElement("MSRP")]
    public double MSRP { get; set; }
    [System.Xml.Serialization.XmlElement("Store")]
    public string Store { get; set; }
  }

在陣列迴圈中,調用 CreateMessage 方法提取物的產品物件的不同屬性,並將它們分配給要發送的消息。 兩個屬性需要額外注意:

if (isLastProductInArray)
  message.Properties.Add("IsLastMessageInSession", "true");
message.SessionId = catalogName;

會話是極其重要的因為它們允許接收器來確定是否已到達的所有郵件,屬於一個特定的邏輯組。 在這種情況下,通過設置會話 Id 的消息屬性,我們指定 catalogName 值相同的所有消息都到達後接收器不應該使用直到目錄資訊。 另外,陣列中的最後一個產品,我們增加一個新的屬性:IsLastMessageInSession,這將使接收機來確定是否已到達會話中的最後一條消息和目錄可以進行完全處理。 圖 7 顯示運行的 MSDNSender。

Execution of the MSDNSender Project
圖 7 執行 MSDNSender 專案

接收在商店使用訂閱產品目錄

現在,目錄和產品已寄出的主題,並複製到不同的訂閱,讓我們把注意力放到 MSDNReceiver 專案中,在接收和處理郵件。 請注意在 program.cs,然後從 Main 方法中,代碼創建一個用戶端訂閱基於資訊提供由使用者通過 Console.Read­線命令。 預期使用者輸入其存儲編號,這反映了他們希望接收的消息。 簡而言之,每個分店只關注到該存儲區適用的消息:

Console.WriteLine("Enter Store Number");
  string storeNumber = Console.ReadLine();
  Console.WriteLine("Selecting Subscription for Store...");
  // Create a Subscription Client to the Topic
  SubscriptionClient subscriptionClient =
    SubscriptionClient.CreateFromConnectionString(
    serviceBusConnectionString, topicName,
    "Store" + storeNumber.Trim() + "Sub",
    ReceiveMode.PeekLock);

因為我們從基於會話 (如前一節中解釋) 的訂閱接收消息,我們需要請求下一次使用下面的程式碼:

MessageSession sessionReceiver =
  subscriptionClient.AcceptMessageSession(TimeSpan.FromSeconds(5));

基本上,這意味著用戶端將檢查是否有任何要處理的消息在訂閱中 — — 其會話 Id 屬性不為空的那些 — — 如果在五秒的時間內不遇到任何這類消息,則該請求將時間和­出了,終止接收方應用程式。 另一方面,如果找到了一個會話,則將調用 ReceivingSessionMessages 方法。 我們跳進這段代碼之前,讓我們討論一下會話狀態,允許開發人員將收到屬於同一事務的消息時可以使用的資訊存儲的概念。 在這種情況下,我們使用會話狀態,"記住"收到了,最後一個目錄頁,以及消息 — — 產品 — — 的順序到達。

基於此,這裡是在代碼中的工作流:

  1. ReceiveSession 在接收當前消息­郵件方法 (見圖 8),依賴的 ProcessMessage 方法 (圖 9) 來處理它。
  2. 裡面的 ProcessMessage 方法,如果消息是無序的它會自動延遲,並且其 ID 存儲在會話狀態中。 否則為它已標記為"完成",並從訂閱中移除。 此外下, 一個預期序列 — — 目錄頁 — — 存儲在會話。
  3. 目前收到的消息已被處理後,在 ReceiveSessionMessages 後面的代碼在會話,延遲的郵件 Id 檢查,並嘗試處理他們再次基於最新的目錄頁。
  4. 一旦會話已經收到的所有郵件,被關閉接收器。

圖 8 代碼中的 ReceivedSessionMessages 方法

static void ReceiveSessionMessages(MessageSession receiver)
  {
    // Read messages from subscription until subscription is empty
    Console.WriteLine("Reading messages from subscription {0}", 
      receiver.Path);
    Console.WriteLine("Receiver Type:" + receiver.GetType().Name);
    Console.WriteLine("Receiver.SessionId = " + receiver.SessionId);
    SequenceState sessionState = GetState(receiver);
    BrokeredMessage receivedMessage;
    while ((receivedMessage = receiver.Receive()) != null)
    {
      string sessionId = receiver.SessionId;
      ProcessMessage(receivedMessage, ref sessionState, receiver);
      while (sessionState.GetNextOutOfSequenceMessage() != -1)
      {
        // Call back deferred messages
        Console.WriteLine("Calling back for deferred message: Category {0},
          Message sequence {1}", receiver.SessionId,
            sessionState.GetNextSequenceId());
        receivedMessage = receiver.Receive(
          sessionState.GetNextOutOfSequenceMessage());
        ProcessMessage(receivedMessage, ref sessionState, receiver);
      }
      if (receivedMessage.Properties.ContainsKey(
        "IsLastMessageInSession"))
        break;
    }
    SetState(receiver, null);
    receiver.Close();
  }

圖 9 中的代碼的 ProcessMessage 方法

static void ProcessMessage(BrokeredMessage message, ref SequenceState sessionState,
  MessageSession session = null)
  {
    if (session != null)
    {
      int messageId = Convert.ToInt32(message.Properties["CatalogPage"]);
      if (sessionState.GetNextSequenceId() == messageId)
      {
        OutputMessageInfo("RECV: ", message, "State: " + "RECEIVED");
        sessionState.SetNextSequenceId(messageId + 1);
        message.Complete();
        SetState(session, sessionState);
      }
      else
      {
        Console.WriteLine("Deferring message: Category {0}, Catalog Page {1}",
          session.SessionId, messageId);
        sessionState.AddOutOfSequenceMessage(messageId, 
          message.SequenceNumber);
        message.Defer();
        SetState(session, sessionState);
      }
    }
    Thread.Sleep(receiverDelay);
  }

請記住對於本專案,延遲的郵件 Id 存儲在會話狀態中,和可能會丟失。 在生產環境中,我們建議為此目的使用某種類型的持久性存儲區 (Windows Azure 表是一個選項)。 請注意,如果郵件中包含的屬性 IsLastMessage­SessionInSession (在發送過程中設置),終止會話迴圈。 在主控台輸出中可以看到的 MSDNReceiver 專案圖 10

Execution of the MSDNReceiver project
圖 10 執行 MSDNReceiver 專案

Windows Azure 服務匯流排訂閱給你創建篩選出郵件之前他們正在消耗的具體規則的能力。 在這種情況下,它會相對容易地創建一個規則,隔離產品按類別或商店編號 (其中我們忽略此專案中)。 直接在 Windows Azure 門戶或通過 Visual Studio 工具,可以以程式設計方式創建規則。

總結

Windows Azure 服務匯流排提供發佈/訂閱模式驚人地強大而靈活的實現。 可以通過使用主題和訂閱處理許多不同的方案。 支援多個寄件者向多個接收者,加上有能力邏輯分組和排序的消息,廣播訊息的能力為現代開發人員開闢了一個世界的可能性。 此外,可以利用一個持久性會話跟蹤狀態使得簡單直觀,邏輯分組的消息並控制它們的順序。 在分散式的環境在哪裡規範的世界,瞭解如何使用消息處理模式和他們周圍的工具是今天的軟體架構師在雲計算工作的關鍵。

Bruno Terkaly 是微軟的開發人員宣傳員。他淵博的知識是來自在該領域使用眾多平台、語言、架構、SDK、程式庫和 API 撰寫程式碼的多年經驗。他把時間投入在撰寫程式碼、部落格以及發表現場簡報,特別是使用 Windows Azure 平台。

Ricardo Villalobos 是經驗豐富的軟體架構設計師,在為供應鏈管理產業的公司設計和建立應用程式擁有 15 以上的經驗。他從達拉斯大學工商管理持有不同的技術認證,以及碩士學位,為微軟工作作為 Windows Azure CSV 孵化組中雲建築師。

由於以下的技術專家對本文的審閱:阿彼錫 · 拉爾