Este artigo foi traduzido por máquina.

Fundamentos

Buffers de barramento de serviços

Juval Lowy

Baixe o código de exemplo

Na minha coluna de outubro de 2009, “ roteadores no the Service Bus ” (MSDN.Microsoft.com/magazine/ee335696), apresentei a direção futura provável do barramento de serviços Windows Azure AppFabric — tornando-se o interceptador definitivo. Eu o recurso de roteadores apresentadas e prometidos escrever sobre filas a seguir.

Desde então, roteadores e filas tiverem sido adiadas o segundo lançamento o barramento de serviço e, em vez disso — agora — o barramento de serviço fornecerá buffers. Versões futuras provavelmente irão adicionar registro, diagnóstico e várias opções de instrumentação. Visito desses aspectos em um artigo futuro. Neste artigo, eu irá descrever o aspecto de buffers e também mostrar que alguns avançados técnicas de programação do WCF (Windows Communication Foundation).

Buffers de barramento de serviços

O barramento de serviço, cada URI no espaço de nomes de serviço é realmente uma junção mensagens endereçável. O cliente pode enviar uma mensagem para essa ligação e a junção pode retransmitir para os serviços. No entanto, cada junção também pode funcionar como um buffer (consulte <T> 1).

Figure 1 Buffers in the Service Bus

Figura 1 Buffers de barramento de serviços

As mensagens são armazenadas no buffer por um período configurável de tempo, mesmo quando nenhum serviço está monitorando o buffer. Observe que vários serviços podem monitorar o buffer, mas a menos que explicitamente Inspecionar mensagem e bloquear a mensagem, somente um deles será capaz de recuperar uma mensagem.

O cliente é dissociado dos serviços de buffer e o cliente e o serviço não precisam estar executando ao mesmo tempo. Como o cliente interage com um buffer e não com um ponto de extremidade de serviço real, todas as mensagens são unidirecionais e não é possível (prontos) para obter os resultados da chamada mensagem ou erros.

Os buffers de barramento de serviço não devem ser equated com filas, tais como filas do Microsoft Message Queuing (MSMQ) ou WCF em fila serviços; eles têm algumas diferenças fundamentais:

  • Os buffers de barramento de serviço não são duráveis e as mensagens são armazenadas na memória. Isso implica um risco de perda de mensagens no evento (um pouco improvável) de uma falha catastrófica do barramento de serviço propriamente dito.
  • Os buffers de barramento de serviço não são transacionais; envio nem recuperar mensagens pode ser feito como parte de uma transação.
  • Os buffers não podem tratar mensagens longa. O serviço deve recuperar uma mensagem do buffer em 10 minutos ou a mensagem será descartada. Embora as mensagens de MSMQ do WCF também apresentam um time-to-live, esse período é muito maior, usando como padrão para um dia. Isso permite que uma variedade muito maior de operações verdadeiramente desassociadas e aplicativos desconectados.
  • Os buffers são limitados no tamanho e não podem conter mais de 50 mensagens.
  • As mensagens no buffer são capped em tamanho, em 64 KB. Embora o MSMQ também impõe sua próprias tamanho máximo da mensagem, é substancialmente maior (4 MB por mensagem).

Assim, buffers não fornecem true chamadas em fila em nuvem; em vez disso, eles fornecem para elasticity da conexão com chamadas caindo em algum lugar entre enfileiradas e chamadas assíncrona incêndio e esquecer.

Há dois cenários em que os buffers são úteis. Um é um aplicativo onde o cliente e o serviço estão interagindo através de uma conexão instável e cancelando a conexão e separação-lo novamente é tolerado desde que as mensagens são armazenados em buffer durante o período curto off-line. Um cenário de segundo (e mais comum) é um cliente emitir chamadas assíncronas unidirecionais e da utilização de um buffer de resposta (conforme descrito posteriormente na seção técnica) para manipular os resultados das chamadas. Essa interação é como exibindo a conexão de rede mais como um cabo bungee em vez de um cabo de rede rígidas não possui nenhuma capacidade de armazenamento.

Trabalhando com buffers

O endereço de buffer deve ser exclusivo; você pode ter apenas um único buffer associado a um endereço e o endereço já não pode ser usado por um buffer ou um serviço. No entanto, várias pessoas podem recuperar mensagens do mesmo buffer. Além disso, o endereço de buffer deve usar HTTP ou HTTPS para o esquema. Para enviar e recuperar mensagens do buffer, o barramento de serviço oferece uma API semelhante do System.Messaging; ou seja, ele requer que você interagir com mensagens não processadas. O administrador de barramento de serviço gerencia os buffers independentemente de clientes de serviços. Cada buffer deve ter uma diretiva que controlam sua vida útil and de comportamento. Fora da caixa, o administrador de barramento de serviço deve realizar chamadas através de programação para criar e gerenciar os buffers.

Cada política de buffer é expressa por meio de uma instância da classe MessageBufferPolicy conforme mostrado no Figura 2.

Figura 2 A classe MessageBufferPolicy

[DataContract]
public class MessageBufferPolicy : ...
{
  public MessageBufferPolicy();
  public MessageBufferPolicy(MessageBufferPolicy policyToCopy);

  public DiscoverabilityPolicy Discoverability
  {get;set;}

  public TimeSpan ExpiresAfter
  {get;set;}

  public int MaxMessageCount
  {get;set;}

  public OverflowPolicy OverflowPolicy
  {get;set;}

  public AuthorizationPolicy Authorization
  {get;set;} 
  
  public TransportProtectionPolicy TransportProtection
  {get;set;}
}

A propriedade de diretiva detectabilidade é enum do tipo DiscoverabilityPolicy, controlar ou não o buffer está incluído no registro de barramento de serviço (ATOM feed):

public enum DiscoverabilityPolicy
{
  Managers,
  ManagersListeners,
  ManagersListenersSenders,
  Public 
}

Detectabilidade padrão DiscoverabilityPolicy.Managers, o que significa que requer uma declaração autorização gerenciado. Defini-la como DiscoverabilityPolicy.Public publica o feed sem nenhuma autorização.

A propriedade ExpiresAfter controla o tempo de vida de mensagens no buffer. O padrão é cinco minutos, o valor mínimo é de um minuto e o valor máximo permitido é de 10 minutos. Qualquer tentativa de definir uma vida útil mais longa será silenciosamente ignorada.

A propriedade MaxMessageCount maiúscula o tamanho do buffer. A diretiva padrão é 10 mensagens e o valor mínimo é, obviamente, definido como um. Como já mencionado, o tamanho do buffer máximo é 50 e tenta configurar um tamanho maior silenciosamente é ignorada.

A propriedade OverflowPolicy é enum com um único valor definido como:

public enum OverflowPolicy
{
  RejectIncomingMessage
}

OverflowPolicy controla o que fazer com a mensagem quando o buffer é maximizado para fora, ou seja, quando ele já estará preenchido a capacidade (definida por MaxMessageCount). A opção só é possível é rejeitar a mensagem — enviá-lo novamente com um erro ao remetente.

O único valor enum serve como um espaço reservado para futuras opções, tais como descartar a mensagem sem informar ao remetente ou removendo mensagens do buffer e aceitar a nova mensagem.

As duas últimas propriedades são responsáveis pela configuração de segurança. A propriedade AuthorizationPolicy instrui o barramento de serviço se deseja ou não autorizar o token do cliente:

public enum AuthorizationPolicy
{
  NotRequired,
  RequiredToSend,
  RequiredToReceive,
  Required
}

O valor padrão de AuthorizationPolicy.Required requer autorização tanto o envio e recebimento de clientes. 

Finalmente, a propriedade TransportProtection determina o nível mínimo de segurança de transferência para a mensagem ao buffer, usando um enum do tipo TransportProtectionPolicy:

public enum TransportProtectionPolicy
{
  None,
  AllPaths,
}

Segurança de transporte via TransportProtectionPolicy.AllPaths é o padrão para todas as diretivas de buffer e ele exige o uso de um endereço HTTPS.

Você pode usar a classe MessageBufferClient para administrar o buffer, conforme mostrado no Figura 3.

Figura 3 A classe MessageBufferClient

public sealed class MessageBufferClient
{
  public Uri MessageBufferUri
  {get;}

  public static MessageBufferClient CreateMessageBuffer(
    TransportClientEndpointBehavior credential,
    Uri messageBufferUri,MessageBufferPolicy policy);

  public static MessageBufferClient GetMessageBuffer(
    TransportClientEndpointBehavior credential,Uri messageBufferUri);
  public MessageBufferPolicy GetPolicy();
  public void DeleteMessageBuffer();

  // More members   
}

Você usar os métodos estáticos da MessageBufferClient para obter uma instância autenticada MessageBufferClient fornecendo os métodos estáticos com as credenciais de barramento de serviço (do tipo TransportClientEndpointBehavior). Sempre que usar MessageBufferClient, você geralmente precisará verificar se o buffer já existe no barramento de serviço, chamando o método GetMessageBuffer. Se não houver nenhum buffer, GetMessageBuffer lança uma exceção.

Veja como criar programaticamente um buffer é:

Uri bufferAddress = 
  new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer/");

TransportClientEndpointBehavior credential = ...

MessageBufferPolicy bufferPolicy = new MessageBufferPolicy();

bufferPolicy.MaxMessageCount = 12;
bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3);
bufferPolicy.Discoverability = DiscoverabilityPolicy.Public;

MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,
  bufferPolicy);

Neste exemplo, você criar uma instância de um objeto de diretiva buffer e definir a diretiva para alguns valores desejados. Tudo o que você precisa para instalar o buffer é chamar o método CreateMessageBuffer de MessageBufferClient com a diretiva e algumas credenciais válidas.

Como alternativa às chamadas através de programação, você pode usar meu Explorer barramento de serviço (apresentado no meu artigo de roteadores e também está disponível on-line com o código de exemplo deste artigo) tanto exibir e modificar buffers. Figura 4 mostra como criar um novo buffer, especificando seu endereço e várias propriedades de diretiva. Da mesma forma, você também pode excluir todos os buffers no espaço de nomes de serviço.

Figure 4 Creating a Buffer Using the Service Bus Explorer
Figura 4 Criando um buffer usando o Gerenciador de serviço Bus

Você pode também analisar e modificar as políticas de buffers existentes, limpar mensagens do buffer e até mesmo excluir um buffer selecionando o buffer na árvore de espaço para nome de serviço e interagindo com as propriedades de buffer no painel à direita, como mostra a Figura 5.

Figure 5 A Buffer in the Service Bus Explorer
Figura 5 Um buffer no Gerenciador de barramento de serviço

Simplificando a administração

Ao criar os buffers, é melhor maximizar ao tamanho do buffer e o seu ciclo de vida, para que os clientes e serviços mais tempo para interagir. Além disso, é uma boa idéia para tornar o buffer localizável para que poder visualizá-lo sobre o registro de barramento de serviço. Quando se trata de usando o buffer, o cliente e o serviço devem verificar que o buffer já tiver sido criada ou que vá para criá-lo.

Para automatizar essas etapas, criei a classe ServiceBusHelper:

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,string secret);
  public static void CreateBuffer(string bufferAddress,string issuer,
    string secret);

  public static void VerifyBuffer(string bufferAddress,string secret);
  public static void VerifyBuffer(string bufferAddress,string issuer,
    string secret);
  public static void PurgeBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential);
  public static void DeleteBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential); 
}

O método CreateBuffer cria um novo buffer detectável com uma capacidade máxima de 50 mensagens e uma duração de 10 minutos. Se já existir um buffer, CreateBuffer exclui o buffer antigo. O método VerifyBuffer verifica se um buffer existe e, em caso negativo, cria um novo buffer. PurgeBuffer é útil para limpar todas as mensagens no buffer durante o diagnóstico ou depuração. DeleteBuffer simplesmente exclui o buffer. Figura 6 mostra uma lista parcial da implementação desses métodos.

Figura 6 Listagem parcial dos métodos auxiliares de buffer

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,
    string issuer,string secret)
  {
    TransportClientEndpointBehavior credentials = ...;
    CreateBuffer(bufferAddress,credentials);
  }
  static void CreateBuffer(string bufferAddress,
    TransportClientEndpointBehavior credentials)
  {
    MessageBufferPolicy policy = CreateBufferPolicy();
    CreateBuffer(bufferAddress,policy,credentials);
  }
  static internal MessageBufferPolicy CreateBufferPolicy()
  {
    MessageBufferPolicy policy = new MessageBufferPolicy();                
    policy.Discoverability = DiscoverabilityPolicy.Public;
    policy.ExpiresAfter = TimeSpan.Fromminutes(10);
    policy.MaxMessageCount = 50;

    return policy;
  }
   public static void PurgeBuffer(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     Debug.Assert(BufferExists(bufferAddress,credentials));
     MessageBufferClient client = 
       MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
     MessageBufferPolicy policy = client.GetPolicy();
     client.DeleteMessageBuffer();
        
     MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,policy);
   }
   public static void VerifyBuffer(string bufferAddress,
     string issuer,string secret)
   {
     TransportClientEndpointBehavior credentials = ...;
     VerifyBuffer(bufferAddress,credentials);
   }
   internal static void VerifyBuffer(string bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     if(BufferExists(bufferAddress,credentials))
     {
       return;
     }
     CreateBuffer(bufferAddress,credentials);
   }
   internal static bool BufferExists(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     try
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
       client.GetPolicy();
       return true;
     }
     catch(FaultException)
     {}
      
     return false;
   }
   static void CreateBuffer(string bufferAddress,
     MessageBufferPolicy policy,
     TransportClientEndpointBehavior credentials)
   {   
     Uri address = new Uri(bufferAddress);
     if(BufferExists(address,credentials))
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,address);
       client.DeleteMessageBuffer();
     }  
     MessageBufferClient.CreateMessageBuffer(credentials,address,policy);
   }
}

O método BufferExists usa o método GetPolicy de MessageBufferClient para ver se existe um buffer e ele interpreta um erro como uma indicação de que o buffer não existe. Limpar um buffer é feito copiando sua política, excluindo o buffer e criando um novo de buffer (com o mesmo endereço) com diretiva antiga.

Para enviar e recuperar mensagens

Como já mencionado, os buffers de barramento de serviço exigem que as interações com mensagens do WCF brutas. Isso é feito com os métodos Send e recuperar da MessageBufferClient (obtida ao criar ou obter um buffer):

public sealed class MessageBufferClient
{
  public void Send(Message message);
  public void Send(Message message,TimeSpan timeout);

  public Message Retrieve();
  public Message Retrieve(TimeSpan timeout);

  // More members
}

Os dois métodos estão sujeitos a um tempo de limite padrão é um minuto para que as versões sem parâmetros. Do remetente, o tempo limite representa quanto tempo esperar caso o buffer esteja completo. Recuperador, o tempo limite de quanto tempo significa esperar caso o buffer esteja vazio.

Aqui está o código do lado do remetente para enviar mensagens brutas para o buffer:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client =   
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);

Message message = Message.CreateMessage(MessageVersion.Default,"Hello");

client.Send(message,TimeSpan.MaxValue);

O remetente primeiro cria um objeto de credenciais e o utiliza para obter uma instância do MessageBufferClient. O remetente, em seguida, cria uma mensagem do WCF e o envia para o buffer. Aqui está o código do lado Recuperando para recuperação de mensagens brutas do buffer:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client = 
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = client.Retrieve();

Debug.Assert(message.Headers.Action == "Hello");

Serviços em buffer

O uso de mensagens do WCF brutas nos trechos de código anterior é que o barramento de serviço tem a oferecer. E ainda, como um modelo de programação deixa muito a desejar. Ele é complicado, entediante, não-estruturado, não orientado a objeto e tipo seguro. É um throwback dias antes do WCF, com programação explícita em MSMQ usando a API System.Messaging. Você precisa analisar o conteúdo da mensagem e alternar de seus elementos.

Felizmente, você pode melhorar a oferta básica. Em vez de interagir com mensagens brutas, você deve elevar a interação estruturados chamadas entre clientes e serviços. Embora isso exija um considerável grau de Low nível avançado de trabalho, fui capaz de encapsulá-lo com um pequeno conjunto de classes auxiliares.

Para fornecer chamadas em buffer estruturadas no lado do serviço, escrevi BufferedServiceBusHost <T>definido como:

// Generic type parameter based host
public class ServiceHost<T> : ServiceHost
{...}

public class BufferedServiceBusHost<T> : ServiceHost<T>,...
{
  public BufferedServiceBusHost(params Uri[] bufferAddresses);
  public BufferedServiceBusHost(
    T singleton,params Uri[] bufferAddresses);

  /* Additional constructors */
}

Eu modelado BufferedServiceBusHost <T>usando WCF com a associação MSMQ. Você precisa fornecer seu construtor com o endereço ou endereços dos buffers para recuperar mensagens. O restante é como com um host de serviço do WCF normal:

Uri buffer = new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(buffer);
host.Open();

Observe que fornecem os construtores com vários endereços de buffer para monitorar, assim como um host de serviço WCF pode abrir vários pontos de extremidade com filas diferentes. Não há nenhuma necessidade (ou forma) fornece nenhuma esses endereços de buffer na seção serviço ponto de extremidade no arquivo de configuração (embora os endereços de buffer podem vir da seção de configurações de aplicativo se então criar).

Enquanto a comunicação com o buffer do barramento de serviço real é feita com mensagens do WCF brutas, que o trabalho é encapsulado. BufferedServiceBusHost <T>verificará se os buffers fornecidos realmente existem e irão criá-los se Don tiverem, usando a política de buffer de ServiceBusHelper.VerifyBuffer mostrado naFigura 6. BufferedServiceBusHost <T>usará a segurança de transferência padrão de segurança de todos os caminhos. Também será Verifique se os contratos do parâmetro de tipo genérico T do serviço fornecido estão todos unidirecionais; ou seja, todos eles ser operações unidirecionais somente (como a retransmissão unidirecional ligação faz). Um último recurso: Quando o host, depuração de fechamento cria somente, BufferedServiceBusHost <T>irá limpar todos os seus buffers para garantir um início suave para a próxima sessão de depuração.

BufferedServiceBusHost <T>opera hospedando localmente o serviço especificado. Para cada contrato de serviço no parâmetro de tipo T, BufferedServiceBusHost <T>proporciona um ponto de extremidade IPC (pipes nomeado). A ligação IPC para esses pontos de extremidade está configurada para nunca expirar.

Embora o IPC sempre tem uma sessão de transporte para imitar o comportamento MSMQ até mesmo por sessão serviços são tratados como por chamada serviços. Cada mensagem WCF desenfileiramento é reproduzida para uma nova instância do serviço, potencialmente simultaneamente com as mensagens anteriores, assim como com a associação MSMQ. Se o tipo de serviço fornecido é um singleton, BufferedServiceBusHost <T>que respeita e enviará todas as mensagens em todos os buffers e pontos de extremidade para a mesma instância de serviço, assim como com a associação MSMQ.

BufferedServiceBusHost <T>monitora cada buffer especificado no thread de trabalho separado do plano de fundo. Quando uma mensagem é depositada no buffer, BufferedServiceBusHost <T>recupera e converte a mensagem do WCF bruta em uma chamada para o ponto de extremidade apropriado sobre IPC.

Figura 7 fornece uma lista parcial de BufferedServiceBusHost <T>, com a maioria do tratamento de erros e segurança removido.

Figura 7 Listagem parcial dos BufferedServiceBusHost <T>

public class BufferedServiceBusHost<T> : 
  ServiceHost<T>,IServiceBusProperties 
{
  Uri[] m_BufferAddresses;
  List<Thread> m_RetrievingThreads;
  IChannelFactory<IDuplexSessionChannel>
    m_Factory;
  Dictionary<string,IDuplexSessionChannel> 
    m_Proxies;

  const string CloseAction = 
    "BufferedServiceBusHost.CloseThread";

  public BufferedServiceBusHost(params Uri[] 
    bufferAddresses)
  {
    m_BufferAddresses = bufferAddresses;
    Binding binding = new NetNamedPipeBinding();
    binding.SendTimeout = TimeSpan.MaxValue;

    Type[] interfaces = 
      typeof(T).GetInterfaces();

    foreach(Type interfaceType in interfaces)
    {         
      VerifyOneway(interfaceType);
      string address = 
        @"net.pipe://localhost/" + Guid.NewGuid();
      AddServiceEndpoint(interfaceType,binding,
        address);
    }
    m_Factory = 
      binding.BuildChannelFactory
      <IDuplexSessionChannel>();
    m_Factory.Open();
  }
  protected override void OnOpened()
  {
    CreateProxies();                       
    CreateListeners();
    base.OnOpened();
  }
  protected override void OnClosing()
  {
    CloseListeners();

    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }

    m_Factory.Close();

    PurgeBuffers();

    base.OnClosing();
  }

  // Verify all operations are one-way
  
  void VerifyOneway(Type interfaceType)
  {...}
  void CreateProxies()
  {
    m_Proxies = 
      new Dictionary
      <string,IDuplexSessionChannel>();

    foreach(ServiceEndpoint endpoint in 
      Description.Endpoints)
    {
      IDuplexSessionChannel channel = 
        m_Factory.CreateChannel(endpoint.Address);
      channel.Open();
      m_Proxies[endpoint.Contract.Name] = 
        channel;
    }
  }

  void CreateListeners()
  {
    m_RetrievingThreads = new List<Thread>();

    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {         ?      ServiceBusHelper.VerifyBuffer(
        bufferAddress.AbsoluteUri,m_Credential);
         
      Thread thread = new Thread(Dequeue);

      m_RetrievingThreads.Add(thread);
      thread.IsBackground = true;
      thread.Start(bufferAddress);
    }
  }

  void Dequeue(object arg)
  {
    Uri bufferAddress = arg as Uri;

    MessageBufferClient bufferClient = ?      MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);      
    while(true)
    {
      Message message = 
        bufferClient.Retrieve(TimeSpan.MaxValue);
      if(message.Headers.Action == CloseAction)
      {
        return;
      }
      else
      {
        Dispatch(message);
      }      
    }
  }
   
  
  
  void Dispatch(Message message)
  {
    string contract = ExtractContract(message);
    m_Proxies[contract].Send(message);
  }
  string ExtractContract(Message message)
  {
    string[] elements = 
      message.Headers.Action.Split('/');
    return elements[elements.Length-2];         
  }
  protected override void OnClosing()
  {
    CloseListeners();
    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }
    m_Factory.Close();

    PurgeBuffers();
    base.OnClosing();
  }
  void SendCloseMessages()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      MessageBufferClient bufferClient =                 ?        MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);
      Message message =   
        Message.CreateMessage(
        MessageVersion.Default,CloseAction);
      bufferClient.Send(message);
    }   
  }
  void CloseListeners()
  {
    SendCloseMessages();

    foreach(Thread thread in m_RetrievingThreads)
    {
      thread.Join();
    }
  }   

  [Conditional("DEBUG")]
  void PurgeBuffers()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      ServiceBusHelper.PurgeBuffer(
        bufferAddress,m_Credential);
    }
  } 
}

BufferedServiceBusHost <T>armazena os proxies para os pontos de extremidade IPC hospedados localmente em um dicionário chamado m_Proxies:

Dictionary<string,IDuplexSessionChannel> m_Proxies;

A chave no dicionário de é nome de tipo de contrato ’ os pontos de extremidade.

Os construtores armazenam endereços buffer fornecido e, em seguida, usam a reflexão para obter uma coleção de todas as interfaces no tipo de serviço. Para cada interface BufferedServiceBusHost <T>verifica ele tem somente operações unidirecionais e, em seguida, chama o AddServiceEndpoint base para adicionar um ponto de extremidade para esse tipo de contrato. O endereço é um endereço IPC usando um GUID para nome do pipe. Os construtores de usam a ligação IPC para criar uma fábrica de canais do tipo IChannelFactory <iduplexsessionchannel>. IChannelFactory <T>é usado para criar um canal sem rigidez de tipos sobre a vinculação:

public interface IChannelFactory<T> : IChannelFactory
{
  T CreateChannel(EndpointAddress to);
  // More members
}

Depois de abrir o host interno com todos os seus pontos de extremidade do IPC, o método OnOpened cria os proxies internos para esses pontos de extremidade e os ouvintes no buffer. Essas duas etapas são o coração do BufferedServiceBusHost <T>. Para criar os proxies, ele itera na coleção de pontos de extremidade. Ele obtém o endereço de cada ponto de extremidade e usa o IChannelFactory <iduplexsessionchannel>para criar um canal em relação a esse endereço. Esse canal (ou proxy) é então armazenado no dicionário. O método CreateListeners itera sobre os endereços de buffer especificado. Para cada endereço, ele verifica o buffer e cria um thread de trabalho dequeue suas mensagens.

O método dequeue usa um MessageBufferClient para recuperar as mensagens em um loop infinito e despachá-los usando o método de distribuição. Despacho extrai da mensagem o nome do contrato de destino e o usa para pesquisar o IDuplexChannel do dicionário proxies e enviar a mensagem sobre IPC. IDuplexChannel é compatível com o canal IPC subjacente e fornece uma maneira de enviar mensagens brutas:

public interface IOutputChannel : ...
{
  void Send(Message message,TimeSpan timeout);
  // More members
}
public interface IDuplexSessionChannel : IOutputChannel,...
{}

Se tiver ocorrido um erro durante a chamada IPC, BufferedServiceBusHost <T>recriará o canal gerencia contra o ponto de extremidade (não mostrado na Figura 7). Quando você fechar o host, você precisa fechar os proxies. Isso normalmente aguardará as chamadas em andamento concluir. O problema é como normalmente fechar todos os threads recuperar, pois MessageBufferClient.Retrieve é uma operação de bloqueio e não existe uma maneira interna para anular a ele. A solução é lançar em cada buffer monitorado uma mensagem particular especial cuja ação sinaliza o segmento de recuperação para sair. Este é o que faz o método SendCloseMessages. O método CloseListeners envia essa mensagem privada para os buffers e aguarda até que todos os threads de escutando encerrar associando-os. Fechar os threads de ouvintes pára de alimentação de mensagens para os proxies internos e depois os proxies são fechados (quando todas as chamadas atuais em andamento tem retornado), o host está pronto para desligar. BufferedServiceBusHost <T>também oferece suporte a um método de abortar amigável que apenas anula todas as threads (não mostradas na Figura 7).

Finalmente, observe que BufferedServiceBusHost <T>oferece suporte à interface IServiceBusProperties defini como:

public interface IServiceBusProperties
{
  TransportClientEndpointBehavior Credential
  {get;set;}

  Uri[] Addresses
  {get;}
}

Precisava de uma interface em alguns lugares na criação de minha estrutura, especialmente na simplificação de buffer. Para o cliente, escrevi a classe BufferedServiceBusClient <T>definido como:

public abstract class BufferedServiceBusClient<T> :                          
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties 
{
  // Buffer address from config
  public BufferedServiceBusClient() 
  {}
  // No need for config file
  public BufferedServiceBusClient(Uri bufferAddress);


  /* Additional constructors with different credentials */  
  protected virtual void Enqueue(Action action);
}

BufferedServiceBusClient <T>deriva da minha HeaderClientBase <T,H> (um proxy auxiliar usada para passar informações nos cabeçalhos da mensagem; consulte meu artigo de novembro de 2007, “ sincronização contextos no WCF, ” disponível emMSDN.Microsoft.com/magazine/cc163321): 

public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T> 
                                              where T : class
{
  protected H Header
  {get;set;}

  // More members
}

O objetivo dessa classe base é oferecer suporte a um serviço de resposta, conforme discutido na seção a seguir. Para um cliente simples de um serviço em buffer, que derivação é irrelevante.

Você pode usar BufferedServiceBusClient <T>com ou sem um arquivo de configuração do cliente. Os construtores aceitam o endereço de buffer não requerem um arquivo de configuração. O construtor sem parâmetros ou os construtores que aceite o nome do ponto de extremidade que o arquivo de configuração para conter um ponto de extremidade correspondendo o tipo de contrato com a ligação de retransmissão unidirecional (Embora essa ligação completamente é ignorada pelo BufferedServiceBusClient <T>).

Ao derivar seu proxy de BufferedServiceBusClient <T>, será necessário usar o método Enqueue protegido em vez de diretamente usando a propriedade Channel:

[ServiceContract]
interface IMyContract
{
  [OperationContract(IsOneWay = true)]
  void MyMethod(int number);
}

class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
  public void MyMethod(int number)
  {
    Enqueue(()=>Channel.MyMethod(number));
  }
}

Enqueue aceita um delegado (ou uma expressão lambda) que envolve o uso da propriedade Channel. O resultado ainda é tipo seguro. Figura 8 mostra uma lista parcial do BufferedServiceBusClient <T>classe.

Figura 8 Listagem parcial dos BufferedServiceBusClient <T>

public abstract class BufferedServiceBusClient<T> :                               
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties where T : class
{
  MessageBufferClient m_BufferClient;

  public BufferedServiceBusClient(Uri bufferAddress) : 
    base(new NetOnewayRelayBinding(),new EndpointAddress(bufferAddress)) 
  {}

  protected virtual void Enqueue(Action action) 
  {
    try
    {
      action();
    }
    catch(InvalidOperationException exception)
    {
      Debug.Assert(exception.Message ==
        "This message cannot support the operation " +
        "because it has been written.");
    }
  }
  protected override T CreateChannel()
  {    
    ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);
    m_BufferClient =  ?      MessageBufferClient.GetMessageBuffer(Credential,m_BufferAddress);

    return base.CreateChannel();   
  }
  protected override void PreInvoke(ref Message request)
  {
    base.PreInvoke(ref request);       
           
    m_BufferClient.Send(request);
  }
  protected TransportClientEndpointBehavior Credential
  {
    get
    {...}
    set
    {...}
  }
}

Os construtores de BufferedServiceBusClient <T>fornecem seu construtor base com o endereço de buffer e a vinculação, que é sempre uma ligação de retransmissão unidirecional para impor a validação de operações unidirecionais. O método CreateChannel verifica se o buffer de destino existe e obtém um MessageBufferClient que a representa. O coração do BufferedServiceBusClient <T>é o método PreInvoke. PreInvoke é um método virtual fornecido pela InterceptorClientBase <T>, a classe base de HeaderClientBase <T,H>:

public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class
{
  protected virtual void PreInvoke(ref Message request);
  // Rest of the implementation 
}

PreInvoke permite processar com facilidade as mensagens do WCF antes que eles estiver enviados pelo cliente. BufferedServiceBusClient <T>substitui PreInvoke e usa o buffer de cliente para enviar a mensagem ao buffer. Dessa forma, o cliente mantém um modelo de programação estruturada e BufferedServiceBusClient <T>encapsula a interação com a mensagem do WCF. A desvantagem é que a mensagem só pode ser enviada uma vez e quando a classe raiz de ClientBase tenta enviá-lo, ele lança um InvalidOperationException. Isso é onde Enqueue se torna útil por snuffing out essa exceção.

Serviço de resposta

Na minha coluna de fevereiro de 2007, “ construir um enfileirados resposta serviço WCF ” (MSDN.Microsoft.com/magazine/cc163482), expliquei que a única maneira de receber o resultado (ou erros) de uma chamada na fila é usar um serviço de resposta na fila. Mostrei como passar os cabeçalhos das mensagens de um objeto de contexto de resposta que contém a identificação do método lógico e o endereço de resposta:

[DataContract]
public class ResponseContext
{
  [DataMember]
  public readonly string ResponseAddress;

  [DataMember]
  public readonly string MethodId;

  public ResponseContext(string responseAddress,string methodId);

  public static ResponseContext Current
  {get;set;}

  // More members 
}

O mesmo padrão de design continua verdadeiro ao lidar com buffers. O cliente precisa fornecer um buffer de resposta dedicado para o serviço do buffer de resposta a. O cliente também precisa passar o endereço de resposta e a identificação do método os cabeçalhos de mensagens, assim como acontece com as chamadas do MSMQ. A principal diferença entre o serviço de MSMQ com resposta e o barramento de serviço é que o buffer de resposta também deve residir no barramento de serviço, conforme mostrado no Figura 9.

Figure 5 A Buffer in the Service Bus Explorer
Figura 9 Barramento de serviços com buffer completo serviço de resposta

Para otimizar o lado do cliente, escrevi a classe ClientBufferResponseBase <T>definido como:

 

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  protected readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress);

  /* Additional constructors with different credentials */
     
  protected virtual string GenerateMethodId();
}

ClientBufferResponseBase <T>é uma subclasse especializada de BufferedServiceBusClient <T>e adiciona o contexto de resposta para os cabeçalhos das mensagens. Isso é por isso que eu fiz BufferedServiceBusClient <T>derivar de HeaderClientBase <T,H> e não meramente de InterceptorClientBase <T>. Você pode usar ClientBufferResponseBase <T>como BufferedServiceBusClient, como mostrado na Figura 10.

Figura 10 Simplificação do lado do cliente

[ServiceContract]
interface ICalculator
{
  [OperationContract(IsOneWay = true)]
  void Add(int number1,int number2);
}

class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator
{
  public CalculatorClient(Uri responseAddress) : base(responseAddress)
  {}
   
  public void Add(int number1,int number2)
  {
     Enqueue(()=>Channel.Add(number1,number2));
  }
}

Usando a subclasse da ClientBufferResponseBase <T>é simples:

Uri resposeAddress = 
  new Uri(@"sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/");

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
proxy.Close();

É útil ao gerenciar as respostas no lado do cliente que o cliente chamar o método de identificação usada para expedir a chamada de obter. Isso é feito com facilidade por meio da propriedade do cabeçalho:

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;

Figura 11 relaciona a implementação de ClientBufferResponseBase <T>.ClientBufferResponseBase <T>substitui o método PreInvoke de HeaderClientBase <T,H> para que ele poderia gerar uma nova identificação do método para cada chamada e defini-la para os cabeçalhos.

Figura 11 Implementando ClientBufferResponseBase <T>

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  public readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress)
  {
    ResponseAddress = responseAddress;
  }
   
  /* More Constructors */

  protected override void PreInvoke(ref Message request)
  {
    string methodId = GenerateMethodId();
    Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId);         
    base.PreInvoke(ref request);
  }

  protected virtual string GenerateMethodId()
  {
    return Guid.NewGuid().ToString();
  }

  // Rest of the implementation 
}

Para simplificar o trabalho necessário pelo serviço de buffer para chamar o serviço de resposta, escrevi a classe ServiceBufferResponseBase <T>mostrado na Figura 12.

Figura 12 O ServiceBufferResponseBase <T>classe

public abstract class ServiceBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class 
{
  public ServiceBufferResponseBase() : 
   base(new Uri(ResponseContext.Current.ResponseAddress))
 {
   Header = ResponseContext.Current;
               
   // Grab the credentials the host was using 

   IServiceBusProperties properties = 
     OperationContext.Current.Host as IServiceBusProperties;
   Credential = properties.Credential;
  }
}

Embora o serviço possa usar um simples BufferedServiceBusClient <T>para enfileirar a resposta, você precisará extrair o endereço de buffer de resposta de cabeçalhos e, de alguma forma, obter as credenciais para entrar no buffer de barramento de serviço. Você também precisará fornecer os cabeçalhos de chamada de saída com o contexto de resposta. Todas essas etapas podem ser simplificadas com ServiceBufferResponseBase <T>. ServiceBufferResponseBase <T>fornece seu construtor base com o endereço fora do contexto de resposta, e também define que o contexto em cabeçalhos de saída.

Outro simplificando suposição ServiceBufferResponseBase <T>faz é que o serviço de resposta pode usar as mesmas credenciais de seu host usado (para recuperar mensagens de seu próprio buffer) para enviar mensagens para o buffer de resposta. Para esse fim, ServiceBufferResponseBase <T>obtém uma referência a seu próprio host a partir do contexto de operação e lê as credenciais usando a implementação IServiceBusProperties do host. ServiceBufferResponseBase<T> copia essas credenciais para seu próprio uso (feito dentro BufferedServiceBusClient <T>). Isso, claro, exige o uso de BufferedServiceBusHost <T>para hospedar o serviço em primeiro lugar. O serviço deve derivar uma classe de proxy de ServiceBufferResponseBase<T> e usá-lo para responder. Por exemplo, dada a este contrato de resposta:

[ServiceContract]
interface ICalculatorResponse
{
  [OperationContract(IsOneWay = true)]
  void OnAddCompleted(int result,ExceptionDetail error);
}
This would be the definition of the proxy to the response service:
class CalculatorResponseClient :    
  ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    Enqueue(()=>Channel.OnAddCompleted(result,error));
  }
}

Figura 13 mostra um serviço em buffer simples respondendo ao seu cliente.

Figura 13 Usando ServiceBufferResponseBase<T>

class MyCalculator : ICalculator
{
  [OperationBehavior(TransactionScopeRequired = true)]
  public void Add(int number1,int number2)
  {
    int result = 0;
    ExceptionDetail error = null;
    try
    {
      result = number1 + number2;
    }
     // Don’t rethrow 
    catch(Exception exception)
    {
      error = new ExceptionDetail(exception);
    }
    finally
    {
      CalculatorResponseClient proxy = new CalculatorResponseClient();
      proxy.OnAddCompleted(result,error);
      proxy.Close();
    }
  }
}

Todas as necessidades de serviço de resposta é acessar a identificação do método dos cabeçalhos das mensagens, como mostrado aqui:

class MyCalculatorResponse : ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    string methodId = ResponseContext.Current.MethodId;
    ...
  }
}

Fique atento a explorar ainda mais o barramento de serviço.  

Juval Lowy é arquiteto de software da IDesign e presta serviços de treinamento no WCF e consultoria em arquitetura. Este artigo contém excertos de seu recente livro, “ Programming WCF Services, terceira edição ” (O’Reilly 2010). Ele também é o diretor regional Microsoft para o vale do Silício. Entre em contato com o Lowy em idesign.NET.

Graças ao especialista técnico seguir para revisar este artigo: Jeanne Baker