本文章是由機器翻譯。

基礎知識

服務匯流排緩衝區

Juval Lowy

下載範例程式碼

「 路由器在 [服務匯流排 」 ( 我十月 2009年] 欄中msdn.microsoft.com/magazine/ee335696我所呈現的 Windows Azure AppFabric 服務匯流排可能的未來方向 ),— 變得最終的攔截器。 我所呈現 「 路由器 」 功能,並答應我一手交接下來撰寫關於佇列。

此後,路由器和佇列已被延期至服務] 匯流排的第二個發行版本,而是 — 現在 — 服務匯流排會提供緩衝區。 未來版本可能會加入記錄、 診斷和各種不同的檢測選項。 我將會瀏覽未來的文章中的這些方面。 本文章中我描述緩衝區] 外觀,也告訴您一些進階 Windows 通訊基礎 (WCF) 程式設計技巧。

服務匯流排緩衝區

在 [服務] 匯流排服務命名空間中的每一個 URI 是實際的可定址的傳訊連接點。 用戶端可以將一個訊息傳送給該連接點,並連接點可以轉送到服務。 不過,每個連接點可以也作為緩衝區 (請參閱 圖 1).

Figure 1 Buffers in the Service Bus

圖 1服務匯流排的緩衝區

訊息可設定的一段時間,儲存在緩衝區甚至時沒有服務會監視緩衝區。 請注意多個服務可以監視緩衝區,但除非您明確地窺視而且鎖定訊息,只能有一個都能夠擷取訊息。

從緩衝區後面服務分離用戶端和用戶端與服務需要不會執行在同一時間。 因為用戶端互動與一個緩衝區,而不是與實際服務端點,所有郵件都是單向,而且沒有沒有辦法 (超出方塊) 以取得訊息引動過程或任何錯誤的結果。

服務匯流排緩衝區應該不會 equated 與佇列,例如 Microsoft 訊息佇列 (MSMQ) 佇列或 WCF 排入佇列的服務 ; 它們有一些重要的差異:

  • 服務匯流排緩衝區不持久,並且郵件儲存在記憶體中。 這暗示著損失的嚴重失敗的服務匯流排本身 (有點不太可能) 事件中的訊息的風險。
  • 服務匯流排緩衝區不是交易式,傳送或擷取的郵件都不能完成一個交易的一部份。
  • 緩衝區 can’t 處理長時間 lasting 訊息。 服務必須在 10 分鐘內從緩衝區中擷取訊息或訊息丟棄。 雖然 WCF MSMQ 為基礎的訊息也功能一次存活,該期間內會更長的將預設為一天。 這可讓遠較廣泛範圍的真正斷續的作業和中斷連接的應用程式。
  • 緩衝區的大小有限,並 can’t 按住超過 50 個訊息。
  • 緩衝的訊息被 capped 中每個 64 KB 的大小。 雖然 MSMQ 也會自己的最大郵件大小,它 ’s 大幅較大 (4 MB 每封郵件)。

因此緩衝區並不提供,則為 True 的佇列的呼叫,透過定域機組 ; 而是,elasticity 之連接中為他們提供與呼叫之間某個地方落佇列呼叫和火焰及忘記非同步呼叫。

有兩種案例緩衝區是很有用。 其中一個是在用戶端與服務互動透過 shaky 連線,和卸除連線和拿起一次容忍,只要訊息緩衝短離線期間的應用程式。 第二個和更常見的案例是用戶端發出非同步單向呼叫,並利用回應緩衝區 (如回應服務] 區段中稍後所述),來處理呼叫的結果。 這類互動,就像檢視為 bungee 電源線,而不是有沒有儲存容量的嚴格網路電線更多的網路連線。

使用緩衝區

緩衝區位址必須是唯一的 ; 您可以有只有單一緩衝區的地址與相關,而且地址 can’t 已經由使用緩衝區或服務。 不過,多個合作對象可以從相同緩衝區擷取訊息。 在另外的緩衝區位址必須配置使用 HTTP 或 HTTPS。 傳送及接收的郵件從緩衝區,服務匯流排提供類似 System.Messaging API ; 也就是它會要求您未經處理的訊息進行互動。 服務匯流排系統管理員管理獨立於服務或用戶端的緩衝區。 每個緩衝區都必須有原則,來管理其行為 and 存留期。 超出方塊服務匯流排系統管理員必須執行以程式設計方式呼叫建立和管理緩衝區。

所示,表示緩衝區中的每個原則透過 MessageBufferPolicy 類別的執行個體圖 2.

圖 2MessageBufferPolicy 類別

[DataContract]
public class MessageBufferPolicy : ...
{
  public MessageBufferPolicy();
  public MessageBufferPolicy(MessageBufferPolicy policyToCopy);

  public DiscoverabilityPolicy Discoverability
  {get;set;}

  public TimeSpan ExpiresAfter
  {get;set;}

  public int MaxMessageCount
  {get;set;}

  public OverflowPolicy OverflowPolicy
  {get;set;}

  public AuthorizationPolicy Authorization
  {get;set;} 
  
  public TransportProtectionPolicy TransportProtection
  {get;set;}
}

可測知性原則屬性是控制緩衝區包含在服務匯流排登錄 (ATOM 餵送) 的 DiscoverabilityPolicy 型別的列舉:

public enum DiscoverabilityPolicy
{
  Managers,
  ManagersListeners,
  ManagersListenersSenders,
  Public 
}

可測知性預設 DiscoverabilityPolicy.Managers 這表示它需要受管理的授權宣告。 設定為 DiscoverabilityPolicy.Public 發佈任何授權的情況下餵送。

ExpiresAfter 屬性控制緩衝區中的訊息的存留期 (Lifetime)。 預設值為五分鐘、 最小的值為一分鐘,最大允許值為 10 分鐘。 以無訊息模式忽略任何嘗試設定較長的生命週期。

MaxMessageCount 屬性大寫緩衝區大小。 原則預設為 10 的訊息,而最小值的當然,將設為 1。 已經有提到,最大緩衝區大小為 50,且嘗試設定較大的大小會以無訊息模式忽略。

OverflowPolicy 屬性是以單一值定義為列舉:

public enum OverflowPolicy
{
  RejectIncomingMessage
}

OverflowPolicy 控制該怎麼處理訊息,當緩衝區 maxed 縮小,也就是當 (由 MaxMessageCount 定義) 的容量已經填滿。 唯一可能的選項是拒絕訊息 — 與錯誤傳送給寄件者。

單值列舉當做如沒有通知寄件者或從緩衝區移除郵件及接受新的郵件捨棄訊息的未來選項的預留位置。

最後兩個屬性負責安全性組態。 AuthorizationPolicy 屬性會指示授權用戶端 ’s 語彙基元服務匯流排:

public enum AuthorizationPolicy
{
  NotRequired,
  RequiredToSend,
  RequiredToReceive,
  Required
}

預設值為 AuthorizationPolicy.Required 需要授權同時傳送和接收用戶端。 

最後,TransportProtection 屬性 stipulates 最小的郵件使用的型別 TransportProtectionPolicy 列舉緩衝區的傳輸安全性層級:

public enum TransportProtectionPolicy
{
  None,
  AllPaths,
}

TransportProtectionPolicy.AllPaths 透過傳輸安全性是所有的緩衝區原則為預設值,而且要求使用 HTTPS 地址。

中所示,您可以管理您的緩衝區使用 MessageBufferClient 類別圖 3.

圖 3MessageBufferClient 類別

public sealed class MessageBufferClient
{
  public Uri MessageBufferUri
  {get;}

  public static MessageBufferClient CreateMessageBuffer(
    TransportClientEndpointBehavior credential,
    Uri messageBufferUri,MessageBufferPolicy policy);

  public static MessageBufferClient GetMessageBuffer(
    TransportClientEndpointBehavior credential,Uri messageBufferUri);
  public MessageBufferPolicy GetPolicy();
  public void DeleteMessageBuffer();

  // More members   
}

您可以使用的 MessageBufferClient 靜態的方法來取得 MessageBufferClient 的已驗證執行個體提供服務匯流排認證類型 TransportClientEndpointBehavior) 的靜態方法。 每當使用 MessageBufferClient,您通常需要檢查是否緩衝區已存在於服務匯流排藉由呼叫 GetMessageBuffer 方法。 如果有 ’s 沒有緩衝區,GetMessageBuffer 會擲回例外狀況。

這裡 ’s 如何以程式設計方式建立一個緩衝區:

Uri bufferAddress = 
  new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer/");

TransportClientEndpointBehavior credential = ...

MessageBufferPolicy bufferPolicy = new MessageBufferPolicy();

bufferPolicy.MaxMessageCount = 12;
bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3);
bufferPolicy.Discoverability = DiscoverabilityPolicy.Public;

MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,
  bufferPolicy);

本範例在您執行個體化緩衝區原則物件,並將原則設為 [一些想要的值。 所有安裝緩衝區所需花費呼叫 MessageBufferClient CreateMessageBuffer 的方法與原則和一些有效的認證。

除了以程式設計方式呼叫外,您使用 (出現在我的路由器文件,也可以使用線上範例程式碼,在本文中) 我服務匯流排總管檢視和修改緩衝區。 圖 4 示範如何藉由指定它的位址和各種原則內容中建立新的緩衝區。 很多以相同方式,您也可以刪除服務命名空間中的所有緩衝區。

Figure 4 Creating a Buffer Using the Service Bus Explorer
圖 4建立緩衝區,使用服務匯流排總管

您可以也檢閱及修改現有的緩衝區的原則、 從緩衝區清除訊息並甚至刪除一個緩衝區,方法是選取服務名稱區樹狀目錄中的緩衝區,互動緩衝區內容在右邊窗格的 所示圖 5.

Figure 5 A Buffer in the Service Bus Explorer
圖 5服務匯流排總管中的緩衝區

簡化系統管理

在建立緩衝區時它 ’s 緩衝區大小和其壽命最大化,讓用戶端的最佳方法,而且服務更多的時間來進行互動。 此外,它 ’s 將緩衝區設定為可探索的供服務匯流排登錄上檢視是個不錯的作法。 說到使用緩衝區,用戶端和服務應該會驗證該緩衝區已經建立不然前進到建立它。

自動執行這些步驟,我建立 ServiceBusHelper 類別:

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,string secret);
  public static void CreateBuffer(string bufferAddress,string issuer,
    string secret);

  public static void VerifyBuffer(string bufferAddress,string secret);
  public static void VerifyBuffer(string bufferAddress,string issuer,
    string secret);
  public static void PurgeBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential);
  public static void DeleteBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential); 
}

CreateBuffer 方法建立新的設定為可探索緩衝區與最大容量的 50 個訊息和工期為 10 分鐘。 如果緩衝區已存在,CreateBuffer 刪除舊的緩衝區。 VerifyBuffer 方法驗證緩衝區存在,是否它 doesn’t 會建立新的緩衝區。 PurgeBuffer 可以用於診斷或偵錯期間清除緩衝的所有郵件。 DeleteBuffer 只是刪除緩衝區。 圖 6 示範這些方法實作的部分清單。

圖 6緩衝區協助程式方法的部分清單

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,
    string issuer,string secret)
  {
    TransportClientEndpointBehavior credentials = ...;
    CreateBuffer(bufferAddress,credentials);
  }
  static void CreateBuffer(string bufferAddress,
    TransportClientEndpointBehavior credentials)
  {
    MessageBufferPolicy policy = CreateBufferPolicy();
    CreateBuffer(bufferAddress,policy,credentials);
  }
  static internal MessageBufferPolicy CreateBufferPolicy()
  {
    MessageBufferPolicy policy = new MessageBufferPolicy();                
    policy.Discoverability = DiscoverabilityPolicy.Public;
    policy.ExpiresAfter = TimeSpan.Fromminutes(10);
    policy.MaxMessageCount = 50;

    return policy;
  }
   public static void PurgeBuffer(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     Debug.Assert(BufferExists(bufferAddress,credentials));
     MessageBufferClient client = 
       MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
     MessageBufferPolicy policy = client.GetPolicy();
     client.DeleteMessageBuffer();
        
     MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,policy);
   }
   public static void VerifyBuffer(string bufferAddress,
     string issuer,string secret)
   {
     TransportClientEndpointBehavior credentials = ...;
     VerifyBuffer(bufferAddress,credentials);
   }
   internal static void VerifyBuffer(string bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     if(BufferExists(bufferAddress,credentials))
     {
       return;
     }
     CreateBuffer(bufferAddress,credentials);
   }
   internal static bool BufferExists(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     try
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
       client.GetPolicy();
       return true;
     }
     catch(FaultException)
     {}
      
     return false;
   }
   static void CreateBuffer(string bufferAddress,
     MessageBufferPolicy policy,
     TransportClientEndpointBehavior credentials)
   {   
     Uri address = new Uri(bufferAddress);
     if(BufferExists(address,credentials))
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,address);
       client.DeleteMessageBuffer();
     }  
     MessageBufferClient.CreateMessageBuffer(credentials,address,policy);
   }
}

BufferExists 方法會使用 MessageBufferClient GetPolicy 方法,以查看緩衝區存在,且它會錯誤解譯為表示緩衝區不存在。 正在清除緩衝區複製它的原則、 刪除緩衝區,並建立新的緩衝區 (具有相同的地址) 即可完成 舊的原則。

傳送及擷取訊息

已經有提到,服務匯流排緩衝區會需要原始的 WCF 訊息之間的互動。 這是與 [傳送及擷取方法的 MessageBufferClient (取得建立或取得緩衝區時):

public sealed class MessageBufferClient
{
  public void Send(Message message);
  public void Send(Message message,TimeSpan timeout);

  public Message Retrieve();
  public Message Retrieve(TimeSpan timeout);

  // More members
}

這兩種方法會受到預設為無參數版本的一分鐘的逾時。 寄件者在等候逾時表示多久等候萬一緩衝區已滿。 對於該擷取器在逾時表示多久等候萬一緩衝區是空的。

這裡 ’s 傳送未經處理的郵件訊息給緩衝區寄件者端的程式碼:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client =   
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);

Message message = Message.CreateMessage(MessageVersion.Default,"Hello");

client.Send(message,TimeSpan.MaxValue);

寄件者先建立一個認證物件,並使用來取得 MessageBufferClient 的執行個體。 寄件者再建立一個 WCF 訊息,並將其傳送至緩衝區。 以下是從緩衝區中擷取未經處理的訊息擷取端的程式碼:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client = 
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = client.Retrieve();

Debug.Assert(message.Headers.Action == "Hello");

緩衝處理的服務

使用在前面的程式碼片段中的原始 WCF 訊息是服務匯流排所提供。 這種程式設計模型還,留下大部分必要。 它 ’s 很麻煩,冗長乏味,非化,不是物件導向,並且不型別安全。 它 ’s 到之前使用 System.Messaging API 的 MSMQ 針對明確程式設計本身,WCF 天 throwback。 您需要剖析訊息內容,並在其項目上切換。

幸運的是,您可以增進基本服務上。 而非互動未經處理的郵件,您應該用戶端與服務之間互動提升至結構化的呼叫。 雖然這需要進階的工作相當程度的低階,我便能將它封裝的 Helper 類別的小型集合。

若要提供結構化的緩衝呼叫在服務端,我撰寫 BufferedServiceBusHost <T>定義為:

// Generic type parameter based host
public class ServiceHost<T> : ServiceHost
{...}

public class BufferedServiceBusHost<T> : ServiceHost<T>,...
{
  public BufferedServiceBusHost(params Uri[] bufferAddresses);
  public BufferedServiceBusHost(
    T singleton,params Uri[] bufferAddresses);

  /* Additional constructors */
}

我建立 BufferedServiceBusHost <T>模型與 MSMQ 繫結使用 WCF 之後。 您需要提供它的建構函式位址或位址的緩衝區,以擷取郵件的來源。 其餘部分是就像對一般的 WCF 服務主機一樣:

Uri buffer = new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(buffer);
host.Open();

請注意您可以提供在建構函式具有多個緩衝區位址來監視,就像 WCF 服務主機可以開啟多個端點具有不同的佇列。 有 ’s 提供的任何組態檔中的 [服務] 端點節這些緩衝區位址 (雖然緩衝區位址可能來自應用程式設定區段,如果因此設計) 不需要 (或方式)。

雖然服務匯流排緩衝區與實際的通訊以原始的 WCF 訊息完成,該工作會進行壓縮。 BufferedServiceBusHost <T>會確認所提供的緩衝區實際存在,將會建立它們如果他們 don’t 使用緩衝區原則 ServiceBusHelper.VerifyBuffer 所示的圖 6. BufferedServiceBusHost <T>將使用預設傳輸的安全性保護所有路徑。 將也確認所提供的服務泛型型別參數 T 合約是所有單向 ; 也就是它們都有只能單向作業 (僅為單向轉送繫結會)。 一個的最後一個功能:關閉主應用程式在偵錯僅建置時, BufferedServiceBusHost <T>會清除所有其緩衝區以確保 [平滑開始下一個偵錯工作階段。

BufferedServiceBusHost <T>操作由裝載在本機上指定的服務。 針對型別參數 T 上的每個服務] 合約 BufferedServiceBusHost <T>增加 IPC (具名管道) 上的端點。 IPC 繫結至這些端點設定為 [永遠不會逾時時間。

雖然 IPC 永遠會有一個傳輸工作階段的來模仿 MSMQ 行為視為偶數的每個階段服務為每位呼叫服務。 如同 MSMQ 繫結至該服務可能同時具有先前的訊息的新執行個體播放每個 dequeued 的 WCF 訊息。 如果所提供的服務型別是一個單一物件,BufferedServiceBusHost <T>,會檢查,並將所有郵件都傳送跨所有緩衝區和結束點至同一個服務執行個體就像對 MSMQ 繫結一樣。

BufferedServiceBusHost <T>會監視每個個別背景的背景工作執行緒上指定的緩衝區。 當郵件存放在緩衝區中時,BufferedServiceBusHost <T>擷取它,並透過 IPC 會將未經處理的 WCF 訊息轉換成適當的端點的呼叫。

圖 7 大部分的錯誤處理和移除的安全性提供 BufferedServiceBusHost <T>的部分清單。

圖 7BufferedServiceBusHost <T>的部分清單

public class BufferedServiceBusHost<T> : 
  ServiceHost<T>,IServiceBusProperties 
{
  Uri[] m_BufferAddresses;
  List<Thread> m_RetrievingThreads;
  IChannelFactory<IDuplexSessionChannel>
    m_Factory;
  Dictionary<string,IDuplexSessionChannel> 
    m_Proxies;

  const string CloseAction = 
    "BufferedServiceBusHost.CloseThread";

  public BufferedServiceBusHost(params Uri[] 
    bufferAddresses)
  {
    m_BufferAddresses = bufferAddresses;
    Binding binding = new NetNamedPipeBinding();
    binding.SendTimeout = TimeSpan.MaxValue;

    Type[] interfaces = 
      typeof(T).GetInterfaces();

    foreach(Type interfaceType in interfaces)
    {         
      VerifyOneway(interfaceType);
      string address = 
        @"net.pipe://localhost/" + Guid.NewGuid();
      AddServiceEndpoint(interfaceType,binding,
        address);
    }
    m_Factory = 
      binding.BuildChannelFactory
      <IDuplexSessionChannel>();
    m_Factory.Open();
  }
  protected override void OnOpened()
  {
    CreateProxies();                       
    CreateListeners();
    base.OnOpened();
  }
  protected override void OnClosing()
  {
    CloseListeners();

    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }

    m_Factory.Close();

    PurgeBuffers();

    base.OnClosing();
  }

  // Verify all operations are one-way
  
  void VerifyOneway(Type interfaceType)
  {...}
  void CreateProxies()
  {
    m_Proxies = 
      new Dictionary
      <string,IDuplexSessionChannel>();

    foreach(ServiceEndpoint endpoint in 
      Description.Endpoints)
    {
      IDuplexSessionChannel channel = 
        m_Factory.CreateChannel(endpoint.Address);
      channel.Open();
      m_Proxies[endpoint.Contract.Name] = 
        channel;
    }
  }

  void CreateListeners()
  {
    m_RetrievingThreads = new List<Thread>();

    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {         ?      ServiceBusHelper.VerifyBuffer(
        bufferAddress.AbsoluteUri,m_Credential);
         
      Thread thread = new Thread(Dequeue);

      m_RetrievingThreads.Add(thread);
      thread.IsBackground = true;
      thread.Start(bufferAddress);
    }
  }

  void Dequeue(object arg)
  {
    Uri bufferAddress = arg as Uri;

    MessageBufferClient bufferClient = ?      MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);      
    while(true)
    {
      Message message = 
        bufferClient.Retrieve(TimeSpan.MaxValue);
      if(message.Headers.Action == CloseAction)
      {
        return;
      }
      else
      {
        Dispatch(message);
      }      
    }
  }
   
  
  
  void Dispatch(Message message)
  {
    string contract = ExtractContract(message);
    m_Proxies[contract].Send(message);
  }
  string ExtractContract(Message message)
  {
    string[] elements = 
      message.Headers.Action.Split('/');
    return elements[elements.Length-2];         
  }
  protected override void OnClosing()
  {
    CloseListeners();
    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }
    m_Factory.Close();

    PurgeBuffers();
    base.OnClosing();
  }
  void SendCloseMessages()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      MessageBufferClient bufferClient =                 ?        MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);
      Message message =   
        Message.CreateMessage(
        MessageVersion.Default,CloseAction);
      bufferClient.Send(message);
    }   
  }
  void CloseListeners()
  {
    SendCloseMessages();

    foreach(Thread thread in m_RetrievingThreads)
    {
      thread.Join();
    }
  }   

  [Conditional("DEBUG")]
  void PurgeBuffers()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      ServiceBusHelper.PurgeBuffer(
        bufferAddress,m_Credential);
    }
  } 
}

BufferedServiceBusHost <T>儲存至本機裝載 IPC 端點的 Proxy 稱為 m_Proxies 字典中:

Dictionary<string,IDuplexSessionChannel> m_Proxies;

到字典索引鍵是端點 ’ 合約型別名稱。

建構函式儲存所提供的緩衝區地址,,然後使用反映取得的服務型別上的所有介面集合。 為每個介面 BufferedServiceBusHost <T>會驗證它有只能單向作業,然後呼叫基底 AddServiceEndpoint 新增該合約類型的端點。 位址不使用 GUID 的管道 ’s 名稱是 IPC 位址。 建構函式會使用 IPC 繫結來建置通道處理站類型 IChannelFactory <iduplexsessionchannel>。 IChannelFactory <T>用來建立非-強型別的通道透過繫結:

public interface IChannelFactory<T> : IChannelFactory
{
  T CreateChannel(EndpointAddress to);
  // More members
}

開啟內部主機與它所有的 IPC 端點之後, OnOpened 方法會建立這些端點,並已緩衝的接聽程式內部的 Proxy。 這兩個步驟是 BufferedServiceBusHost <T>的核心。 若要建立的 Proxy,它逐一端點的集合。 它會取得每個端點 ’s 位址,並使用 IChannelFactory <iduplexsessionchannel>來建立對該位址的通道。 然後,該通道 (或 Proxy) 儲存在字典中。 指定的緩衝區位址逐一 CreateListeners 方法。 對每一個地址它會驗證緩衝區,並建立背景工作執行緒,來取消佇列及其訊息。

佇列方法會使用一個 MessageBufferClient 擷取無限迴圈中的郵件,並分派它們使用分派方法。 分派從郵件擷取目標合約名稱,並使用它來查閱 [IDuplexChannel 從的 Proxy 字典,然後透過 IPC 傳送訊息。 IDuplexChannel 基礎 IPC 通道所支援,而且它會提供方法來傳送未經處理的訊息:

public interface IOutputChannel : ...
{
  void Send(Message message,TimeSpan timeout);
  // More members
}
public interface IDuplexSessionChannel : IOutputChannel,...
{}

如果 IPC 呼叫期間發生錯誤,BufferedServiceBusHost <T>會重新建立它管理對該端點 (未顯示在通道圖 7). 當關閉 [主應用程式需要關閉的 Proxy。 這將依正常程序等待完成進行中呼叫。 問題在於如何慢慢地關閉所有擷取的執行緒,因為 MessageBufferClient.Retrieve 已封鎖的作業,而且沒有中止它沒有內建的方法。 解決方案是一個特殊的私用訊息,其動作會發出信號擷取執行緒結束張貼到每個受監視的緩衝區。 這是 SendCloseMessages 方法的作用。 CloseListeners 方法將張貼到緩衝區該私用訊息,並再等待終止聯結到他們的所有接聽執行緒。 關閉接聽執行緒停止餵食訊息內部的 Proxy 並一旦有傳回所有目前的呼叫進行中) 時,都已關閉的 Proxy,主應用程式已準備好要關機。 BufferedServiceBusHost <T>也支援只會中止所有執行緒 (未顯示在一個正常 Abort 方法圖 7).

最後,注意 BufferedServiceBusHost <T>支援介面 IServiceBusProperties 我定義為:

public interface IServiceBusProperties
{
  TransportClientEndpointBehavior Credential
  {get;set;}

  Uri[] Addresses
  {get;}
}

我需要這類介面在建置我架構特別是在簡化緩衝處理的幾個地方。 為用戶端我撰寫類別 BufferedServiceBusClient <T>定義為:

public abstract class BufferedServiceBusClient<T> :                          
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties 
{
  // Buffer address from config
  public BufferedServiceBusClient() 
  {}
  // No need for config file
  public BufferedServiceBusClient(Uri bufferAddress);


  /* Additional constructors with different credentials */  
  protected virtual void Enqueue(Action action);
}

BufferedServiceBusClient <T>衍生自我 HeaderClientBase <T,H> (Helper Proxy 用來將資訊傳遞訊息標頭中的 ; 請參閱我的 2007 年十一月文章,「 同步處理內容中 WCF,」 可在msdn.microsoft.com/magazine/cc163321): 

public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T> 
                                              where T : class
{
  protected H Header
  {get;set;}

  // More members
}

該基底類別的目的是要支援回應服務下, 一節所述。 緩衝處理服務的一般用戶端,對於該衍生是物質。

您可以使用 BufferedServiceBusClient <T>具有或沒有用戶端組態檔。 建構函式接受緩衝區的位址並不需要在組態檔。 無參數建構函式或建構函式接受端點名稱預期在組態檔,以包含相符的合約類型與單向轉送,結合的 (雖然 BufferedServiceBusClient <T>完全忽略該繫結) 的端點。

當從 BufferedServiceBusClient <T>衍生您的 Proxy,您需要使用受保護的佇列方法,而不是直接使用通道屬性:

[ServiceContract]
interface IMyContract
{
  [OperationContract(IsOneWay = true)]
  void MyMethod(int number);
}

class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
  public void MyMethod(int number)
  {
    Enqueue(()=>Channel.MyMethod(number));
  }
}

佇列接受委派 (或 Lambda 運算式),自動換行通道屬性的使用。 結果仍是型別安全。 圖 8 顯示 BufferedServiceBusClient <T>的部分清單類別。

圖 8 BufferedServiceBusClient <T>的部分清單

public abstract class BufferedServiceBusClient<T> :                               
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties where T : class
{
  MessageBufferClient m_BufferClient;

  public BufferedServiceBusClient(Uri bufferAddress) : 
    base(new NetOnewayRelayBinding(),new EndpointAddress(bufferAddress)) 
  {}

  protected virtual void Enqueue(Action action) 
  {
    try
    {
      action();
    }
    catch(InvalidOperationException exception)
    {
      Debug.Assert(exception.Message ==
        "This message cannot support the operation " +
        "because it has been written.");
    }
  }
  protected override T CreateChannel()
  {    
    ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);
    m_BufferClient =  ?      MessageBufferClient.GetMessageBuffer(Credential,m_BufferAddress);

    return base.CreateChannel();   
  }
  protected override void PreInvoke(ref Message request)
  {
    base.PreInvoke(ref request);       
           
    m_BufferClient.Send(request);
  }
  protected TransportClientEndpointBehavior Credential
  {
    get
    {...}
    set
    {...}
  }
}

BufferedServiceBusClient <T>的建構函式提供的緩衝區位址,以及繫結一定是單向的轉送繫結,以強制單向作業驗證其基底建構函式。 CreateChannel 方法會檢查目標緩衝區存在,並取得 MessageBufferClient,代表它。 BufferedServiceBusClient <T>的核心是 PreInvoke 方法。 preInvoke 是 InterceptorClientBase <T>,HeaderClientBase <T,H> 基底類別所提供的虛擬方法:

public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class
{
  protected virtual void PreInvoke(ref Message request);
  // Rest of the implementation 
}

preInvoke 可讓您輕鬆地 WCF 訊息之前,先處理它們由用戶端發送。 BufferedServiceBusClient <T>PreInvoke 就會覆寫,並使用緩衝區用戶端傳送訊息至緩衝區。 這樣一來在用戶端會維護結構化程式設計模型,並 BufferedServiceBusClient <T>封裝與 WCF 訊息互動。 缺點是 ClientBase 的訊息可以只傳送一次,並傳送它嘗試根類別時, 就會擲回一個 InvalidOperationException。 這是其中佇列進來方便由 snuffing 出該例外狀況。

回應服務

在 「 建置了佇列 WCF 回應服務 」 ( 我 2007 年二月] 欄msdn.microsoft.com/magazine/cc163482),我說明的接收佇列呼叫結果 (或錯誤) 唯一的方法,就是使用佇列的回應服務。 我會示範如何傳遞訊息標頭中回應內容物件,其中包含邏輯方法 ID 及回覆地址:

[DataContract]
public class ResponseContext
{
  [DataMember]
  public readonly string ResponseAddress;

  [DataMember]
  public readonly string MethodId;

  public ResponseContext(string responseAddress,string methodId);

  public static ResponseContext Current
  {get;set;}

  // More members 
}

處理緩衝區時,相同的設計模式保留,則為 True。用戶端必須提供服務,以緩衝區來回應一個專用的回應緩衝區。用戶端也需要將回覆地址和方法識別碼傳入訊息] 標頭,就像對 MSMQ 為基礎的呼叫一樣。MSMQ 回應服務和服務匯流排主要差異] 是回應緩衝區必須也位於服務] 匯流排所顯示的 圖 9.

Figure 9 Service Bus Buffered Response Service
圖 9服務匯流排緩衝回應服務

若要提高用戶端的效率我撰寫類別 ClientBufferResponseBase <T>定義為:

 

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  protected readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress);

  /* Additional constructors with different credentials */
     
  protected virtual string GenerateMethodId();
}

ClientBufferResponseBase <T>是 BufferedServiceBusClient <T>的特定子類別,它會將回應內容加入至郵件標頭。 這是為什麼我做 BufferedServiceBusClient <T>衍生從 HeaderClientBase <T,H> 及不只是 InterceptorClientBase <T>。 中所示,您可以如同 BufferedServiceBusClient,使用 ClientBufferResponseBase <T>圖 10.

圖 10 簡化用戶端

[ServiceContract]
interface ICalculator
{
  [OperationContract(IsOneWay = true)]
  void Add(int number1,int number2);
}

class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator
{
  public CalculatorClient(Uri responseAddress) : base(responseAddress)
  {}
   
  public void Add(int number1,int number2)
  {
     Enqueue(()=>Channel.Add(number1,number2));
  }
}

使用子類別化 <T>是 ClientBufferResponseBase 的直接的方法:

Uri resposeAddress = 
  new Uri(@"sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/");

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
proxy.Close();

管理用戶端能夠取得識別碼用來發送呼叫方法的叫用用戶端上回應時,它 ’s 方便好用。 這是容易做到透過 [頁首] 屬性:

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;

圖 11 列出實作覆的 ClientBufferResponseBase <T>.ClientBufferResponseBase <T>寫 PreInvoke 方法的 HeaderClientBase <T,H>,以便可以產生新方法編號,每個呼叫並將它設定成標頭。

圖 11 實作 ClientBufferResponseBase <T>

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  public readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress)
  {
    ResponseAddress = responseAddress;
  }
   
  /* More Constructors */

  protected override void PreInvoke(ref Message request)
  {
    string methodId = GenerateMethodId();
    Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId);         
    base.PreInvoke(ref request);
  }

  protected virtual string GenerateMethodId()
  {
    return Guid.NewGuid().ToString();
  }

  // Rest of the implementation 
}

若要提高緩衝處理服務呼叫的回應服務所需的工作量的效率,我撰寫類別 ServiceBufferResponseBase <T>示圖 12.

圖 12 ServiceBufferResponseBase <T>類別

public abstract class ServiceBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class 
{
  public ServiceBufferResponseBase() : 
   base(new Uri(ResponseContext.Current.ResponseAddress))
 {
   Header = ResponseContext.Current;
               
   // Grab the credentials the host was using 

   IServiceBusProperties properties = 
     OperationContext.Current.Host as IServiceBusProperties;
   Credential = properties.Credential;
  }
}

雖然服務可以使用一般 BufferedServiceBusClient <T>至回應佇列,您必須從標頭擷取回應緩衝區位址並某種方式取得認證,才能登入服務匯流排緩衝區。 您也必須提供的傳出呼叫標頭與回應內容。 所有這些步驟可簡化與 ServiceBufferResponseBase <T>。 ServiceBufferResponseBase <T>提供其基底建構函式以回應的內容超出位址,並也設定該內容到外寄的標頭。

另一個簡化的假設 ServiceBufferResponseBase <T>讓是回應的服務可以使用它的主用 (擷取郵件從其本身緩衝區) 相同的認證來傳送訊息給回應緩衝區。 至該結尾 ServiceBufferResponseBase <T>取得參考它自己主機從作業內容,並讀取使用 IServiceBusProperties 實作主機的認證。 ServiceBufferResponseBase <T>複製 (完成 BufferedServiceBusClient <T>內) 其本身使用這些認證。 這的當然要求 BufferedServiceBusHost <T>首先就裝載服務使用。 您的服務需要從 ServiceBufferResponseBase <T>衍生的 Proxy 類別,用來回應。 比方說,給予此回應合約:

[ServiceContract]
interface ICalculatorResponse
{
  [OperationContract(IsOneWay = true)]
  void OnAddCompleted(int result,ExceptionDetail error);
}
This would be the definition of the proxy to the response service:
class CalculatorResponseClient :    
  ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    Enqueue(()=>Channel.OnAddCompleted(result,error));
  }
}

圖 13 顯示簡單的緩衝的服務回應其用戶端。

圖 13 使用 ServiceBufferResponseBase <T>

class MyCalculator : ICalculator
{
  [OperationBehavior(TransactionScopeRequired = true)]
  public void Add(int number1,int number2)
  {
    int result = 0;
    ExceptionDetail error = null;
    try
    {
      result = number1 + number2;
    }
     // Don’t rethrow 
    catch(Exception exception)
    {
      error = new ExceptionDetail(exception);
    }
    finally
    {
      CalculatorResponseClient proxy = new CalculatorResponseClient();
      proxy.OnAddCompleted(result,error);
      proxy.Close();
    }
  }
}

所有的回應服務需求是從訊息的標頭存取方法識別碼,如下所示:

class MyCalculatorResponse : ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    string methodId = ResponseContext.Current.MethodId;
    ...
  }
}

保持調整進一步探索服務匯流排。  

Juval Lowy* 是 IDesign 的軟體架構設計師,提供 WCF 訓練及架構諮詢。這份文件包含摘錄從他最近的活頁簿,「 程式設計 WCF 服務、 第三版 」 (O’Reilly 2010)。他也 ’s Microsoft 地區導演的矽山谷。請連絡 Lowy 在 idesign.net.*

多虧給來檢閱這份文件的技術專家下列:Jeanne Baker