本文章是由機器翻譯。

WCF 架構

AppFabric 服務匯流排探索

Juval Lowy

下載代碼示例

在 2010 年 1 月的文章“通過發現查找新 WCF”(msdn.microsoft.com/magazine/ee335779) 中,我介紹了 Windows Communication Foundation (WCF) 4 很有價值的發現功能。WCF 發現本質上是一種面向 Intranet 的技術,因為位址資訊是無法通過 Internet 進行廣播的。

不過好在,使用動態位址以及分離位址軸上的用戶端和服務同樣適用于依靠服務匯流排接收用戶端調用的服務。

幸運的是,您可以使用事件中繼綁定替換使用者資料包通訊協定 (UDP) 多播請求並為發現和公告做準備。這樣,您可以將可發現服務的易於部署和服務匯流排的無阻礙連接這兩種優勢結合起來。本文介紹我編寫的一個用於支援服務匯流排發現(使其等效于 WCF 中內置的發現支援)的小框架,以及我的一組説明程式類。它還可以用作運行您自己的發現機制的示例。

AppFabric 服務匯流排背景知識

如果您不熟悉 AppFabric 服務匯流排,可以閱讀以下幾篇過去的文章:

解決方案體系結構

對於 WCF 的內置發現功能,存在用於發現交換的標準約定。遺憾的是,這些約定被定義為內部約定。自訂發現機制的第一步就是定義用於發現請求和回檔的約定。我定義的 IServiceBusDiscovery 約定如下所示:

[ServiceContract]
public interface IServiceBusDiscovery
{
  [OperationContract(IsOneWay = true)]
  void OnDiscoveryRequest(string contractName,string contractNamespace,
    Uri[] scopesToMatch,Uri replayAddress);
}

發現端點支援單一操作 IServiceBusDiscovery。 使用 OnDiscoveryRequest,用戶端可以發現支援某個特定約定的服務端點,就像使用常規 WCF 那樣。 另外,用戶端還可以傳入一組要匹配的可選作用域。

服務應通過事件中繼綁定支援發現端點。 用戶端對支援發現端點的服務發出請求,請求這些服務回檔用戶端提供的回復位址。

服務使用如下定義的 IServiceBusDiscoveryCallback 回檔用戶端:

[ServiceContract]
public interface IServiceBusDiscoveryCallback
{
  [OperationContract(IsOneWay = true)]
  void DiscoveryResponse(Uri address,string contractName,
    string contractNamespace, Uri[] scopes);
}

用戶端提供一個支援 IServiceBusDiscoveryCallback 的端點,其位址是 OnDiscoveryRequest 的 replayAddress 參數。所用的綁定應當是盡可能接近單播的單向中繼綁定。图 1 顯示了發現順序。

图 1 服务总线发现

图 1 中的第一步是用戶端在支援 IServiceBusDiscovery 的發現端點引發一個發現請求事件。在事件綁定的作用下,所有可發現服務都會收到此事件。如果某個服務支援請求的約定,它會通過服務匯流排回檔該用戶端(圖 1 中的步驟 2)。用戶端收到該服務端點(或多個端點)的位址後,便會繼續調用該服務,就像使用常規服務匯流排調用那樣(圖 1 中的步驟 3)。

可發現的主機

很顯然,支援這樣一種發現機制(尤其是針對服務的發現)需要進行大量工作。使用如下定義的 DiscoverableServiceHost 類,我得以封裝這些工作:

public class DiscoverableServiceHost : ServiceHost,...
{ 
  public const string DiscoveryPath = "DiscoveryRequests";

  protected string Namespace {get;}

  public Uri DiscoveryAddress {get;set;}

      public NetEventRelayBinding DiscoveryRequestBinding {get;set;}
      
  public NetOnewayRelayBinding DiscoveryResponseBinding {get;set;}

  public DiscoverableServiceHost(object singletonInstance,
    params Uri[] baseAddresses);
  public DiscoverableServiceHost(Type serviceType,
    params Uri[] baseAddresses);
}

除發現外,DiscoverableServiceHost 還始終將服務端點發佈到服務匯流排註冊表。 要實現像使用常規 WCF 發現那樣的發現功能,必須添加一個發現行為和一個 WCF 發現端點。 這是為了避免再添加其他控制開關,並確保在某一位置有一個一致配置,用於打開或關閉所有發現模式。

DiscoverableServiceHost 的使用方法與所有其他依靠服務匯流排的服務相同:

Uri baseAddress = 
  new Uri("sb://MyServiceNamespace.servicebus.windows.
net/MyService/");

ServiceHost host = new DiscoverableServiceHost(typeof(MyService),baseAddress);
           
// Address is dynamic
host.AddServiceEndpoint(typeof(IMyContract),new NetTcpRelayBinding(),
  Guid.NewGuid().ToString());
 
// A host extension method to pass creds to behavior
host.SetServiceBusCredentials(...);

host.Open();

請注意,使用發現功能時,服務位址可以是完全動態的。

图 2 提供了 DiscoverableServiceHost 相關元素的部分實現。

圖 2 實現 DiscoverableServiceHost(部分)

public class DiscoverableServiceHost : ServiceHost,...
{  
  Uri m_DiscoveryAddress;
  ServiceHost m_DiscoveryHost;
    
  // Extracts the service namespace out of the endpoints or base addresses
  protected string Namespace
  {
    get
    {...}
  }

  bool IsDiscoverable
  {
    get
    {
      if(Description.Behaviors.Find<ServiceDiscoveryBehavior>() != null)
      {
        return Description.Endpoints.Any(endpoint => 
          endpoint is DiscoveryEndpoint);
      }
      return false;
    }
  }
   
  public Uri DiscoveryAddress
  {
    get
    {
      if(m_DiscoveryAddress == null)
      {
        m_DiscoveryAddress =  
          ServiceBusEnvironment.CreateServiceUri("sb",Namespace,DiscoveryPath);
      }
      return m_DiscoveryAddress;
    }
    set
    {
      m_DiscoveryAddress = value;
    }
  }

  public DiscoverableServiceHost(Type serviceType,params Uri[] 
    baseAddresses) : base(serviceType,baseAddresses)
  {}

  void EnableDiscovery()
  {
    // Launch the service to monitor discovery requests
    DiscoveryRequestService discoveryService = 
      new DiscoveryRequestService(Description.Endpoints.ToArray());

    discoveryService.DiscoveryResponseBinding = DiscoveryResponseBinding;

    m_DiscoveryHost = new ServiceHost(discoveryService);

    m_DiscoveryHost.AddServiceEndpoint(typeof(IServiceBusDiscovery),
      DiscoveryRequestBinding, DiscoveryAddress.AbsoluteUri);

     m_DiscoveryHost.Open();
  }

  protected override void OnOpening()
  {
    if(IsDiscoverable)
    {
      EnableDiscovery();
    }
     
    base.OnOpening();
  }
  protected override void OnClosed()
  {
    if(m_DiscoveryHost != null)
    {
      m_DiscoveryHost.Close();
    }

    base.OnClosed();
  }

  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,...]
  class DiscoveryRequestService : IServiceBusDiscovery
  {
    public DiscoveryRequestService(ServiceEndpoint[] endpoints);

    public NetOnewayRelayBinding DiscoveryResponseBinding
    {get;set;}
  }   
}

只有服務有發現行為並且至少有一個發現端點時,DiscoverableServiceHost 的説明程式屬性 IsDiscoverable 才會返回 true。 DiscoverableServiceHost 會覆蓋 ServiceHost 的 OnOpening 方法。 如果服務可發現,OnOpening 會調用 EnableDiscovery 方法。

EnableDiscovery 是 DiscoverableServiceHost 的核心。 它為名為 DiscoveryRequestService 的私有單例類創建內部主機(請參見圖 3)。

圖 3 DiscoveryRequestService 類(部分)

public class DiscoverableServiceHost : ServiceHost
{
  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,
    UseSynchronizationContext = false)]
  class DiscoveryRequestService : IServiceBusDiscovery
  {
    readonly ServiceEndpoint[] Endpoints;

    public NetOnewayRelayBinding DiscoveryResponseBinding
    {get;set;}

    public DiscoveryRequestService(ServiceEndpoint[] endpoints)
    {
      Endpoints = endpoints;
    }

    void IServiceBusDiscovery.OnDiscoveryRequest(string contractName,
      string contractNamespace, Uri[] scopesToMatch, Uri responseAddress)
    {
      ChannelFactory<IServiceBusDiscoveryCallback> factory = 
        new ChannelFactory<IServiceBusDiscoveryCallback>(
        DiscoveryResponseBinding, new EndpointAddress(responseAddress));

      IServiceBusDiscoveryCallback callback = factory.CreateChannel();

      foreach(ServiceEndpoint endpoint in Endpoints)
      {
        if(endpoint.Contract.Name == contractName && 
          endpoint.Contract.Namespace == contractNamespace)
        {
          Uri[] scopes = DiscoveryHelper.LookupScopes(endpoint);

          if(scopesToMatch != null)
          {
            bool scopesMatched = true;
            foreach(Uri scope in scopesToMatch)
            {
              if(scopes.Any(uri => uri.AbsoluteUri == scope.AbsoluteUri) 
                == false)
              {
                scopesMatched = false;
                break;
              }
            }
            if(scopesMatched == false)
            {
              continue;
            }
          }
          callback.DiscoveryResponse(endpoint.Address.Uri,contractName,
            contractNamespace,scopes);
        }
      }
      (callback as ICommunicationObject).Close();
    }
  }   
}

public static class DiscoveryHelper
{
  static Uri[] LookupScopes(ServiceEndpoint endpoint)
  {
    Uri[] scopes = new Uri[]{};
    EndpointDiscoveryBehavior behavior = 
      endpoint.Behaviors.Find<EndpointDiscoveryBehavior>();
    if(behavior != null)
    {
      if(behavior.Scopes.Count > 0)
      {
        scopes = behavior.Scopes.ToArray();
      }
    }
    return scopes;
  }
  // More members
}

DiscoveryRequestService 的構造函數接受需要監視其發現請求的服務端點(主要是 DiscoverableServiceHost 的端點)。

隨後 EnableDiscovery 將一個實現 IServiceBusDiscovery 的端點添加到主機,因為 DiscoveryRequestService 實際回應的是來自用戶端的發現請求。 該發現端點的位址預設為服務命名空間下的 URI“DiscoveryRequests”。 但在打開 DiscoverableServiceHost 之前,您可以使用 DiscoveryAddress 屬性將此位址更改為任何其他 URI。 關閉 DiscoverableServiceHost 也會關閉發現端點的主機。

图 3 列出了 DiscoveryRequestService 的實現。

OnDiscoveryRequest 首先創建一個代理,用於回檔請求發現的用戶端。 所用的綁定是普通 NetOnewayRelayBinding,但您可以通過設置 DiscoveryResponseBinding 屬性對此進行控制。 請注意,DiscoverableServiceHost 有一個專門針對此用途的屬性。 隨後,OnDiscoveryRequest 對提供給構造函數的端點集合進行反覆運算。 對於每個端點,它會檢查約定是否與發現請求中所請求的約定匹配。 如果約定相匹配,OnDiscoveryRequest 將查找與該端點關聯的作用域,並驗證這些作用域是否與發現請求中的可選作用域匹配。 最後,OnDiscoveryRequest 使用該端點的位址、約定和作用域回檔用戶端。

發現用戶端

我針對用戶端編寫了説明程式類 ServiceBusDiscoveryClient,其定義如下:

public class ServiceBusDiscoveryClient : ClientBase<IServiceBusDiscovery>,...
{         
  protected Uri ResponseAddress
  {get;}

  public ServiceBusDiscoveryClient(string serviceNamespace,string secret);

  public ServiceBusDiscoveryClient(string endpointName);
  public ServiceBusDiscoveryClient(NetOnewayRelayBinding binding,
    EndpointAddress address);

   public FindResponse Find(FindCriteria criteria);
}

ServiceBusDiscoveryClient 是仿照 WCF DiscoveryClient 編寫的,其使用方法也與之相似,如圖 4 所示。

圖 4 使用 ServiceBusDiscoveryClient

string serviceNamespace = "...";
string secret = "...";

ServiceBusDiscoveryClient discoveryClient = 
  new ServiceBusDiscoveryClient(serviceNamespace,secret);
         
FindCriteria criteria = new FindCriteria(typeof(IMyContract));
FindResponse discovered = discoveryClient.Find(criteria);
discoveryClient.Close();
        
EndpointAddress address = discovered.Endpoints[0].Address;
Binding binding = new NetTcpRelayBinding();
ChannelFactory<IMyContract> factory = 
  new ChannelFactory<IMyContract> (binding,address);
// A channel factory extension method to pass creds to behavior
factory.SetServiceBusCredentials(secret);

IMyContract proxy = factory.CreateChannel();
proxy.MyMethod();
(proxy as ICommunicationObject).Close();

ServiceBusDiscoveryClient 是 IServiceBusDiscovery 發現事件端點的一個代理。 用戶端使用它發出對可發現服務的發現請求。 發現端點的位址預設為“DiscoveryRequests”,但您可以使用任何接受端點名稱或端點位址的構造函數指定其他位址。 它會對發現端點使用 NetOnewayRelayBinding 的普通實例,但您可以使用任何接受端點名稱或綁定實例的構造函數指定其他綁定。 ServiceBusDiscoveryClient 支援基數和發現超時,就像 DiscoveryClient 那樣。

图 5 顯示了 ServiceBusDiscoveryClient 的部分實現。

圖 5 實現 ServiceBusDiscoveryClient(部分)

public class ServiceBusDiscoveryClient : ClientBase<IServiceBusDiscovery> 
{         
   protected Uri ResponseAddress
   {get;private set;}

   public ServiceBusDiscoveryClient(string endpointName) : base(endpointName)
   {
      string serviceNamespace =  
        ServiceBusHelper.ExtractNamespace(Endpoint.Address.Uri);
      ResponseAddress = ServiceBusEnvironment.CreateServiceUri(
        "sb",serviceNamespace,"DiscoveryResponses/"+Guid.NewGuid());
    }

   public FindResponse Find(FindCriteria criteria)
   {
      string contractName = criteria.ContractTypeNames[0].Name;
      string contractNamespace = criteria.ContractTypeNames[0].Namespace;

      FindResponse response = DiscoveryHelper.CreateFindResponse();

      ManualResetEvent handle = new ManualResetEvent(false);

     Action<Uri,Uri[]> addEndpoint = (address,scopes)=>
     {
       EndpointDiscoveryMetadata metadata = new EndpointDiscoveryMetadata();
       metadata.Address = new EndpointAddress(address);
         if(scopes != null)
         {
           foreach(Uri scope in scopes)
             {
               metadata.Scopes.Add(scope);
             }
         }
         response.Endpoints.Add(metadata);
                                         
         if(response.Endpoints.Count >= criteria.MaxResults)
         {
           handle.Set();
         }
     };

     DiscoveryResponseCallback callback = 
       new DiscoveryResponseCallback(addEndpoint);

     ServiceHost host = new ServiceHost(callback);
        

     host.AddServiceEndpoint(typeof(IServiceBusDiscoveryCallback),
       Endpoint.Binding,ResponseAddress.AbsoluteUri);

     host.Open();

     try
     {         
       DiscoveryRequest(criteria.ContractTypeNames[0].Name,
         criteria.ContractTypeNames[0].Namespace,
         criteria.Scopes.ToArray(),ResponseAddress);

       handle.WaitOne(criteria.Duration);
     }
     catch
     {}
     finally
     {
       host.Abort();
     }
     return response;
   }
   void DiscoveryRequest(string contractName,string contractNamespace,
     Uri[] scopesToMatch,Uri replayAddress)
   {
     Channel.OnDiscoveryRequest(contractName,contractNamespace,
       scopesToMatch, replayAddress);
   }

   [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,
     UseSynchronizationContext = false)]
   class DiscoveryResponseCallback : IServiceBusDiscoveryCallback
   {
     readonly Action<Uri,Uri[]> Action;

     public DiscoveryResponseCallback(Action<Uri,Uri[]> action)
     {
       Action = action;
     }
     public void DiscoveryResponse(Uri address,string contractName,
       string contractNamespace,Uri[] scopes)
     {
       Action(address,scopes);
     }
   }
}

public static class DiscoveryHelper
{
  internal static FindResponse CreateFindResponse()
  {
    Type type = typeof(FindResponse);

    ConstructorInfo constructor =  

      type.GetConstructors(BindingFlags.Instance|BindingFlags.NonPublic)[0];

     return constructor.Invoke(null) as FindResponse;
  }  
  // More members
}

Find 方法需要能夠接收來自已發現服務的回檔。 為此,每次調用時,Find 都會為名為 DiscoveryResponseCallback 的內部同步單例類打開並關閉一個主機。 Find 為該主機添加一個支援 IServiceBusDiscoveryCallback 的端點。 DiscoveryResponseCallback 的構造函數接受 Action<Uri,Uri[]>類型的一個委託。 每次有服務回應時,DiscoveryResponse 的實現都會調用該委託,為其提供發現的位址和作用域。 Find 方法使用 lambda 運算式將回應聚集在 FindResponse 的一個實例中。 遺憾的是 FindResponse 沒有公共構造函數,因此 Find 使用 DiscoveryHelper 的 CreateFindResponse 方法,後者又使用反射對其進行產生實體。 此外,Find 還會創建一個可等待的事件控制碼。 達到基數時,lambda 運算式會向該控制碼發出信號。 在調用 DiscoveryRequest 之後,Find 會等待該控制碼獲得信號或等到發現持續時間結束,然後中止主機以停止處理任何進行中的發現回應。

更多用戶端説明程式類

儘管我編寫的 ServiceBusDiscoveryClient 具有與 DiscoveryClient 相同功能,但它會在我的 ServiceBusDiscoveryHelper 所提供的改進發現體驗中獲得益處:

public static class ServiceBusDiscoveryHelper
{
  public static EndpointAddress DiscoverAddress<T>(
    string serviceNamespace,string secret,Uri scope = null);

  public static EndpointAddress[] DiscoverAddresses<T>(
    string serviceNamespace,string secret,Uri scope = null);
   
  public static Binding DiscoverBinding<T>(
    string serviceNamespace,string secret,Uri scope = null);
}

DiscoverAddress<T>可發現一個服務(基數為 1),DiscoverAddresses<T>可發現所有可用的服務端點(基數為全部),DiscoverBinding<T>使用服務中繼資料端點發現端點綁定。 我以大致相同的方式定義了 ServiceBusDiscoveryFactory 類:

public static class ServiceBusDiscoveryFactory
{
  public static T CreateChannel<T>(string serviceNamespace,string secret,
    Uri scope = null) where T : class;
   
  public static T[] CreateChannels<T>(string serviceNamespace,string secret,
    Uri scope = null) where T : class;
}

CreateChannel<T>採用基數 1,並使用中繼資料端點獲取用於創建代理的服務的位址和綁定。 CreateChannels<T>使用所有已發現的中繼資料端點為所有已發現服務創建代理。

通知

為實現公告支援,可以再次使用事件中繼綁定替換 UDP 多播。 首先,我定義了 IServiceBusAnnouncements 公告約定:

[ServiceContract]
public interface IServiceBusAnnouncements
{
  [OperationContract(IsOneWay = true)]
  void OnHello(Uri address, string contractName,
    string contractNamespace, Uri[] scopes);

  [OperationContract(IsOneWay = true)]
  void OnBye(Uri address, string contractName, 
    string contractNamespace, Uri[] scopes);
}

圖 6 所示,這次由用戶端公開事件綁定端點並監視公告。

圖 6 服務匯流排的可用性公告

服務將通過單向中繼綁定公佈其可用性並提供其位址(圖 6 中的步驟 1),用戶端將繼續調用這些服務(圖 6 中的步驟 2)。

服務端公告

我的 DiscoveryRequestService 類可支援公告:

public class DiscoverableServiceHost : ServiceHost,...
{
   public const string AnnouncementsPath = "AvailabilityAnnouncements";   

   public Uri AnnouncementsAddress
   {get;set;}
      
   public NetOnewayRelayBinding AnnouncementsBinding
   {get;set;}

  // More members 
}

但與內置 WCF 公告相同的是,它在預設情況下不會公佈其可用性。 要實現公告功能,您需要配置一個具有發現行為的公告端點。 在大多數情況下,您只需完成這一項操作即可。 DiscoveryRequestService 將在服務命名空間下的“AvailabilityAnnouncements”URI 引發其可用性事件。 您可以在打開主機之前通過設置 AnnouncementsAddress 屬性更改該預設值。 預設情況下將使用普通單向中繼綁定引發這些事件,但您可以在打開主機之前使用 AnnouncementsBinding 屬性指定替代設置。 DiscoveryRequestService 將以非同步方式引發其可用性事件,以免在打開和關閉主機的過程中阻止操作。 图 7 顯示了 DiscoveryRequestService 的公告支援元素。

圖 7 通過 DiscoveryRequestService 支援公告

public class DiscoverableServiceHost : ServiceHost,...
{
   Uri m_AnnouncementsAddress;

   bool IsAnnouncing
   {
      get
      {
         ServiceDiscoveryBehavior behavior = 
           Description.Behaviors.Find<ServiceDiscoveryBehavior>();
         if(behavior != null)
         {
           return behavior.AnnouncementEndpoints.Any();
         }
         return false;
      }
   }

   public Uri AnnouncementsAddress
   {
      get
      {
        if(m_AnnouncementsAddress == null)
        {
          m_AnnouncementsAddress = ServiceBusEnvironment.
CreateServiceUri("sb",Namespace,AnnouncementsPath);
        }
        return m_AnnouncementsAddress;
     }
     set
     {
       m_AnnouncementsAddress = value;
     }
  }

  IServiceBusAnnouncements CreateAvailabilityAnnouncementsClient()
  {    
    ChannelFactory<IServiceBusAnnouncements> factory = 
      new ChannelFactory<IServiceBusAnnouncements>(
      AnnouncementsBinding,new EndpointAddress(AnnouncementsAddress));

    return factory.CreateChannel();
  }

   protected override void OnOpened()
   {
      base.OnOpened();

      if(IsAnnouncing)
      {
        IServiceBusAnnouncements proxy =  
          CreateAvailabilityAnnouncementsClient();
        PublishAvailabilityEvent(proxy.OnHello);
      }
   }

   protected override void OnClosed()
   {
     if(IsAnnouncing)
     {
       IServiceBusAnnouncements proxy = 
         CreateAvailabilityAnnouncementsClient();
       PublishAvailabilityEvent(proxy.OnBye);
     }
     ...
}

  void PublishAvailabilityEvent(Action<Uri,string,string,Uri[]> notification)
  {
    foreach(ServiceEndpoint endpoint in Description.Endpoints)
    {
      if(endpoint is DiscoveryEndpoint || endpoint is 
        ServiceMetadataEndpoint)
      {
        continue;
      }
      Uri[] scopes = LookupScopes(endpoint);

      WaitCallback fire = delegate
      {
        try
        {
          notification(endpoint.Address.Uri, endpoint.Contract.Name,
            endpoint.Contract.Namespace, scopes);
          (notification.Target as ICommunicationObject).Close();

        }
        catch
        {}
      };
      ThreadPool.QueueUserWorkItem(fire); 
    }
  }
}

CreateAvailabilityAnnouncementsClient 説明程式方法使用通道工廠為 IServiceBusAnnouncements 公告事件端點創建代理。 在 DiscoveryRequestService 打開之後、關閉之前,它會發出通知。 DiscoveryRequestService 會覆蓋 ServiceHost 的 OnOpened 和 OnClosed 方法。 如果主機配置為支援公告,OnOpened 和 OnClosed 將調用 CreateAvailabilityAnnouncementsClient 以創建代理,並將其傳遞給 PublishAvailabilityEvent 方法以便以非同步方式引發事件。 由於“您好”公告和“再見”公告的事件引發操作相同(唯一的區別在於 IServiceBusAnnouncements 調用哪個方法),因此 PublishAvailabilityEvent 接受對目標方法的委託。 對於 DiscoveryRequestService 的每個端點,PublishAvailabilityEvent 將查找與該端點關聯的作用域,並使用 WaitCallback 匿名方法對向 Microsoft .NET Framework 執行緒池發出的公告進行排隊。 該匿名方法調用提供的委託,然後關閉基本目標代理。

接收公告

我本可以按我在 1 月份的文章中所述的那樣模仿 WCF 提供的 AnnouncementService,但我已經通過 AnnouncementSink<T>實現了許多改進,而且並未發現任何需要使用 AnnouncementService 代替 AnnouncementSink<T>的情況。 此外,我還希望利用並重複使用 AnnouncementSink<T>的行為及其基類。

因此,我針對用戶端編寫了 ServiceBusAnnouncementSink<T>,其定義如下:

[ServiceBehavior(UseSynchronizationContext = false,
  InstanceContextMode = InstanceContextMode.Single)]
public class ServiceBusAnnouncementSink<T> : AnnouncementSink<T>,
   IServiceBusAnnouncements, where T : class
{
  public ServiceBusAnnouncementSink(string serviceNamespace,string secret);

  public ServiceBusAnnouncementSink(string serviceNamespace,string owner,
    string secret);
  public Uri AnnouncementsAddress get;set;}

  public NetEventRelayBinding AnnouncementsBinding {get;set;}
}

ServiceBusAnnouncementSink<T>的構造函數需要服務命名空間。

ServiceBusAnnouncementSink<T>支援將 IServiceBusAnnouncements 作為自託管單例。 ServiceBusAnnouncementSink<T>還會將其自身發佈到服務匯流排註冊表。 ServiceBusAnnouncementSink<T>預設訂閱在服務命名空間下的“AvailabilityAnnouncements”URI 發佈的公告。 您可以在打開它之前通過設置 AnnouncementsAddress 屬性更改該預設值。 ServiceBusAnnouncementSink<T>預設使用普通 NetEventRelayBinding 接收通知,但您可以在打開 ServiceBusAnnouncementSink<T>之前通過設置 AnnouncementsBinding 更改該預設設置。 ServiceBusAnnouncementSink<T>的用戶端可以訂閱 AnnouncementSink<T>的委託以接收公告,也可以僅訪問基址容器中的位址。 有关示例,请参见图 8

图 8 接收公告

class MyClient 
{
   AddressesContainer<IMyContract> m_Addresses;

   public MyClient()
   {
      string serviceNamespace = "...";
      string secret = "...";

      m_Addresses = new ServiceBusAnnouncementSink<IMyContract>(
        serviceNamespace,secret);

      m_Addresses.Open();

      ...
}
   public void OnCallService()
   {  
      EndpointAddress address = m_Addresses[0];

      IMyContract proxy = ChannelFactory<IMyContract>.CreateChannel(
         new NetTcpRelayBinding(),address);
      proxy.MyMethod();
      (proxy as ICommunicationObject).Close();
   }
   ...
}

图 9 顯示了不包含某些錯誤處理的 ServiceBusAnnouncementSink<T>部分實現。

圖 9 實現 ServiceBusAnnouncementSink<T>(部分)

[ServiceBehavior(UseSynchronizationContext = false,
   InstanceContextMode = InstanceContextMode.Single)]
public class ServiceBusAnnouncementSink<T> : AnnouncementSink<T>,
  IServiceBusAnnouncements
{
   Uri m_AnnouncementsAddress;

   readonly ServiceHost Host;
   readonly string ServiceNamespace;
   readonly string Owner;
   readonly string Secret;

   public ServiceBusAnnouncementSink(string serviceNamespace,
     string owner,string secret)
   {
      Host = new ServiceHost(this);
      ServiceNamespace = serviceNamespace;
      Owner = owner;
      Secret = secret;
   }

   public Uri AnnouncementsAddress
   {
      get
      {
         if(m_AnnouncementsAddress == null)
         {
           m_AnnouncementsAddress = 
             ServiceBusEnvironment.CreateServiceUri(
             "sb",ServiceNamespace,
             DiscoverableServiceHost.AnnouncementsPath);
         }
         return m_AnnouncementsAddress;
      }
      set
      {
        m_AnnouncementsAddress = value;
      }
   }
   public override void Open()
   {
      base.Open();

      Host.AddServiceEndpoint(typeof(IServiceBusAnnouncements),
        AnnouncementsBinding, AnnouncementsAddress.AbsoluteUri);
      Host.SetServiceBusCredentials(Owner,Secret);
      Host.Open();
   }
   public override void Close()
   {
      Host.Close();

      base.Close();
   }

   void IServiceBusAnnouncements.OnHello(Uri address,string contractName,
     string contractNamespace,Uri[] scopes)
   {
      AnnouncementEventArgs args = 
        DiscoveryHelper.CreateAnnouncementArgs(
        address,contractName,contractNamespace,scopes);
      OnHello(this,args); //In the base class AnnouncementSink<T>
   }

   void IServiceBusAnnouncements.OnBye(Uri address,string contractName,
     string contractNamespace,Uri[] scopes)
   {
     AnnouncementEventArgs args = DiscoveryHelper.CreateAnnouncementArgs(
       address,contractName,contractNamespace,scopes);        
     OnBye(this,args); //In the base class AnnouncementSink<T>
   }
}

public static class DiscoveryHelper
{
  static AnnouncementEventArgs CreateAnnouncementArgs(Uri address,
    string contractName, string contractNamespace, Uri[] scopes)
  {
    Type type = typeof(AnnouncementEventArgs);
    ConstructorInfo constructor = 
      type.GetConstructors(BindingFlags.Instance|BindingFlags.NonPublic)[0];

    ContractDescription contract = 
      new ContractDescription(contractName,contractNamespace);

    ServiceEndpoint endpoint = 
      new ServiceEndpoint(contract,null,new EndpointAddress(address));
    EndpointDiscoveryMetadata metadata = 
      EndpointDiscoveryMetadata.FromServiceEndpoint(endpoint);

    return constructor.Invoke(
      new object[]{null,metadata}) as AnnouncementEventArgs;
  }
  // More members
}

ServiceBusAnnouncementSink<T>的構造函數將自身作為單例託管,並保存服務命名空間。打開 ServiceBusAnnouncementSink<T>時,它會將一個支援 IServiceBusAnnouncements 的端點添加到自己的主機。IServiceBusAnnouncements 事件處理方法的實現會創建一個 AnnouncementEventArgs 實例,並用公佈的服務位址、約定和作用域對其進行填充,然後調用各個公告方法的基類實現,就像使用常規 WCF 發現功能進行調用那樣。此操作將填充 AddressesContainer<T>的基類,並引發 AnnouncementSink<T>的相應事件。請注意,由於缺少公共構造函數,因此要創建 AnnouncementEventArgs 的實例,必須使用反射。

中繼資料資源管理器

利用我提供的對服務匯流排發現的支援,我將中繼資料資源管理器工具(先前的文章中曾有介紹)的發現功能擴展為支援服務匯流排。如果按一下“發現”按鈕(參見圖 10),則對於您已為其提供憑據的每個服務命名空間,中繼資料資源管理器將嘗試發現可發現服務的中繼資料交換端點,並以樹的形式顯示發現的端點。

圖 10 配置服務匯流排發現

中繼資料資源管理器將預設使用服務命名空間下的 URI“DiscoveryRequests”。您可以更改該路徑,方法是從功能表中選擇“服務匯流排”,然後選擇“發現”,打開“配置 AppFabric 服務匯流排發現”對話方塊(參見圖 10)。

針對每個所需的服務命名空間,您可以在此對話方塊的“發現路徑”文字方塊中為發現事件端點配置所需的相對路徑。

中繼資料資源管理器還支援服務匯流排中繼資料交換端點的公告。要實現接收可用性通知,請打開發現配置對話方塊,並選中“可用性公告”下的“啟用”核取方塊。中繼資料資源管理器將預設使用指定服務命名空間下的“AvailabilityAnnouncements”URI,但您可以為每個服務命名空間配置所需的任何其他公告端點路徑。

中繼資料資源管理器對公告的支援使其成為簡單實用的服務匯流排監視工具。

Juval Lowy  是 IDesign 的一位軟體架構師,負責提供有關 .NET 和體系結構的培訓和諮詢服務。本文中的部分內容摘自他最近編寫的《Programming WCF Services 3rd Edition》一書(O’Reilly,2010 年)。另外,他還是 Microsoft 矽谷地區的區域總監。您可以通過 idesign.net 與 Lowy 聯繫。

衷心感謝以下技術專家對本文的審閱: Wade Wegner