Использование очередей недоставленных сообщений для обработки сбоев при передаче сообщений

Сообщения в очереди могут вызвать сбой доставки. Сообщения, во время доставки которых произошел сбой, записываются в очередь недоставленных сообщений. Сбой доставки может быть вызван такими причинами, как сбои сети, удаленная очередь, заполнение очереди, сбой проверки подлинности или сбой доставки по времени.

Сообщения могут длительное время оставаться в очереди, если принимающее приложение не считывает их из очереди за отведенное время. Такое поведение может не быть свойственно чувствительным к времени сообщениям. Критичные к времени сообщения имеют свойство срок жизни (TTL), заданное в привязке с очередью, которое указывает, насколько долго сообщения могут находиться в очереди до истечения срока действия. Просроченные сообщения отправляются в специальную очередь, называемую очередью недоставленных сообщений. Сообщения могут быть помещены в очередь недоставленных сообщений и по другим причинам, таким как превышение квоты очереди или сбой проверки подлинности.

Обычно для прочтения сообщений из очереди недоставленных сообщений и причин сбоя приложения записывают компенсирующую логику. Компенсирующая логика зависит от причины сбоя. Например, в случае сбоя проверки подлинности можно исправить сертификат, прикрепленный к сообщению, и отправить сообщение повторно. Если сбой доставки произошел по причине достижения квоты целевой очереди, можно повторить попытку отправки в надежде, что проблема с квотой была устранена.

Большинство систем организации очереди имеют общесистемную очередь недоставленных сообщений, в которой хранятся все сообщения данной системы, во время доставки которых произошел сбой. Очередь сообщений (MSMQ) имеет две общесистемные очереди недоставленных сообщений: общесистемная очередь недоставленных транзакционных сообщений, в которой хранятся сообщения, по причине сбоя не доставленные в транзакционную очередь, и общесистемная очередь недоставленных нетранзакционных сообщений, в которой хранятся сообщения, по причине сбоя не доставленные в нетранзакционную очередь. Если два клиента отправляют сообщения в две разные службы, и поэтому разные очереди в WCF совместно используют одну и ту же службу MSMQ для отправки, то можно использовать сочетание сообщений в системной очереди недоставленных писем. Это не всегда оптимально. В некоторых случаях (по соображениям безопасности, например), может быть нежелательно, чтобы один клиент прочел сообщения другого клиента из очереди недоставленных сообщений. При использовании общей очереди недоставленных сообщений клиенты также вынуждены просматривать всю очередь, чтобы найти отправленное ими сообщение, что, в зависимости от количества сообщений в очереди недоставленных сообщений, может оказаться чрезмерно дорого. Поэтому в WCFNetMsmqBindingMsmqIntegrationBinding, и MSMQ в Windows Vista предоставляют настраиваемую очередь недоставленных писем (иногда называемую очередью недоставленных сообщений приложения).

Пользовательская очередь недоставленных сообщений обеспечивает изоляцию между клиентами, использующими для отправки сообщений одну службу MSMQ.

В Windows Server 2003 и Windows XP Windows Communication Foundation (WCF) предоставляет общесистемную очередь недоставленных писем для всех клиентских приложений в очереди. В Windows Vista WCF предоставляет очередь недоставленных писем для каждого клиентского приложения в очереди.

Указание использования очереди недоставленных сообщений

Очередь недоставленных сообщений находится в диспетчере очереди отправляющего приложения. В диспетчере очереди хранятся сообщения с истекшим сроком или сообщения, во время передачи или доставки которых произошел сбой.

Привязка имеет следующие свойства очереди недоставленных писем.

Чтение сообщений из очереди недоставленных сообщений

Приложение, которое считывает сообщения из очереди недоставленных писем, похоже на службу WCF, которая считывает из очереди приложений, за исключением следующих незначительных различий:

  • Для чтения сообщений из системной очереди недоставленных транзакционных сообщений универсальный код ресурса (URI) должен иметь вид net.msmq://localhost/system$;DeadXact.

  • Для чтения сообщений из системной очереди недоставленных нетранзакционных сообщений код URI должен иметь вид net.msmq://localhost/system$;DeadLetter.

  • Чтобы считывать сообщения из настраиваемой очереди недоставленных сообщений, URI должен иметь имя формы:net.msmq://localhost/private/<custom-dlq-name> , где custom-dlq-name — это имя настраиваемой очереди недоставленных букв.

Дополнительные сведения об адресации очередей см. в разделе "Конечные точки службы" и "Адресация очередей".

Стек WCF на приемнике соответствует адресам, которые служба прослушивает с адресом сообщения. Если адреса совпадают, сообщение отправляется, если нет - не отправляется. Это может создать проблемы при чтении сообщений из очереди недоставленных сообщений, поскольку сообщения в очереди недоставленных сообщений обычно адресованы не службе и не службе очереди отправленных сообщений. Поэтому для чтения службой сообщений из очереди недоставленных сообщений необходима установка фильтра по адресу ServiceBehavior, которая дает команду стеку считать совпадающими все сообщения в очереди независимо от адресата. В частности, нужно добавить ServiceBehavior при помощи параметра Anyв службу, считывающую сообщения из очереди недоставленных сообщений.

Обработка подозрительных сообщений из очереди недоставленных сообщений

При некоторых условиях в очередях недоставленных сообщений возможна обработка подозрительных сообщений. Поскольку создать вложенные очереди из системных очередей нельзя, ReceiveErrorHandling при прочтении из системной очереди недоставленных сообщений не может быть установлен в значение Move. Обратите внимание, что при чтении сообщений из пользовательской очереди недоставленных сообщений могут использоваться вложенные очереди, и, таким образом, Move является подходящим средством удаления подозрительного сообщения.

Если ReceiveErrorHandling установлен в значение Reject, подозрительные сообщения при чтении из пользовательской очереди недоставленных сообщений помещаются в системную очередь недоставленных сообщений. При чтении из системной очереди недоставленных сообщений сообщение отбрасывается (удаляется). При возврате из системной очереди недоставленных сообщений в MSMQ сообщение отбрасывается (удаляется).

Пример

В приведенном ниже примере показано, как создавать очередь недоставленных сообщений и как использовать ее для обработки просроченных сообщений. Этот пример основан на примере в разделе "Практическое руководство. Обмен очередями сообщений с конечными точками WCF". В приведенном ниже примере показано, как писать клиентский код для службы обработки заказов, в которой используется очередь недоставленных сообщений для каждого приложения. В примере также показано, как обрабатывать сообщения из очереди недоставленных сообщений.

В следующем примере показан код для клиента, задающего очередь недоставленных сообщений для каждого приложения.

using System;
using System.ServiceModel.Channels;
using System.Configuration;
//using System.Messaging;
using System.ServiceModel;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    
    //The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    //Client implementation code.
    class Client
    {
        static void Main()
        {
            // Get MSMQ queue name from appsettings in configuration.
            string deadLetterQueueName = ConfigurationManager.AppSettings["deadLetterQueueName"];

            // Create the transacted MSMQ queue for storing dead message if necessary.
            if (!System.Messaging.MessageQueue.Exists(deadLetterQueueName))
                System.Messaging.MessageQueue.Create(deadLetterQueueName, true);

            OrderProcessorClient client = new OrderProcessorClient("OrderProcessorEndpoint");
        try
            {	

                // Create the purchase order.
                PurchaseOrder po = new PurchaseOrder();
                po.CustomerId = "somecustomer.com";
                po.PONumber = Guid.NewGuid().ToString();

                PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
                lineItem1.ProductId = "Blue Widget";
                lineItem1.Quantity = 54;
                lineItem1.UnitCost = 29.99F;

                PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
                lineItem2.ProductId = "Red Widget";
                lineItem2.Quantity = 890;
                lineItem2.UnitCost = 45.89F;

                po.orderLineItems = new PurchaseOrderLineItem[2];
                po.orderLineItems[0] = lineItem1;
                po.orderLineItems[1] = lineItem2;

                //Create a transaction scope.
                using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
                {
                    // Make a queued call to submit the purchase order.
                    client.SubmitPurchaseOrder(po);
                    // Complete the transaction.
                    scope.Complete();
                }

                client.Close();
            }
            catch(TimeoutException timeout)
            {
        Console.WriteLine(timeout.Message);
                client.Abort();
        }
            catch(CommunicationException conexcp)
            {
        Console.WriteLine(conexcp.Message);
                client.Abort();
        }

            Console.WriteLine();
            Console.WriteLine("Press <ENTER> to terminate client.");
            Console.ReadLine();
        }
    }
}
Imports System.ServiceModel.Channels
Imports System.Configuration
'using System.Messaging;
Imports System.ServiceModel
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples

    'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    'Client implementation code.
    Friend Class Client
        Shared Sub Main()
            ' Get MSMQ queue name from appsettings in configuration.
            Dim deadLetterQueueName As String = ConfigurationManager.AppSettings("deadLetterQueueName")

            ' Create the transacted MSMQ queue for storing dead message if necessary.
            If (Not System.Messaging.MessageQueue.Exists(deadLetterQueueName)) Then
                System.Messaging.MessageQueue.Create(deadLetterQueueName, True)
            End If


            Dim client As New OrderProcessorClient("OrderProcessorEndpoint")
            Try


                ' Create the purchase order.
                Dim po As New PurchaseOrder()
                po.CustomerId = "somecustomer.com"
                po.PONumber = Guid.NewGuid().ToString()

                Dim lineItem1 As New PurchaseOrderLineItem()
                lineItem1.ProductId = "Blue Widget"
                lineItem1.Quantity = 54
                lineItem1.UnitCost = 29.99F

                Dim lineItem2 As New PurchaseOrderLineItem()
                lineItem2.ProductId = "Red Widget"
                lineItem2.Quantity = 890
                lineItem2.UnitCost = 45.89F

                po.orderLineItems = New PurchaseOrderLineItem(1) {}
                po.orderLineItems(0) = lineItem1
                po.orderLineItems(1) = lineItem2

                'Create a transaction scope.
                Using scope As New TransactionScope(TransactionScopeOption.Required)
                    ' Make a queued call to submit the purchase order.
                    client.SubmitPurchaseOrder(po)
                    ' Complete the transaction.
                    scope.Complete()
                End Using


                client.Close()
            Catch timeout As TimeoutException
                Console.WriteLine(timeout.Message)
                client.Abort()
            Catch conexcp As CommunicationException
                Console.WriteLine(conexcp.Message)
                client.Abort()
            End Try

            Console.WriteLine()
            Console.WriteLine("Press <ENTER> to terminate client.")
            Console.ReadLine()
        End Sub
    End Class
End Namespace

В следующем примере показан код для файла конфигурации клиента.

В следующем примере показан код для службы обработки сообщений из очереди недоставленных сообщений.

using System;
using System.ServiceModel.Description;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract.
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
    public interface IOrderProcessor
    {
        [OperationContract(IsOneWay = true)]
        void SubmitPurchaseOrder(PurchaseOrder po);
    }

    // Service class that implements the service contract.
    // Added code to write output to the console window
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single, AddressFilterMode = AddressFilterMode.Any)]
    public class PurchaseOrderDLQService : IOrderProcessor
    {
        OrderProcessorClient orderProcessorService;
        public PurchaseOrderDLQService()
        {
            orderProcessorService = new OrderProcessorClient("OrderProcessorEndpoint");
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SimpleSubmitPurchaseOrder(PurchaseOrder po)
        {
            Console.WriteLine("Submitting purchase order did not succeed ", po);
            MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
            Console.WriteLine();
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SubmitPurchaseOrder(PurchaseOrder po)
        {
            Console.WriteLine("Submitting purchase order did not succeed ", po);
            MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
            Console.WriteLine();

            // Resend the message if timed out.
            if (mqProp.DeliveryFailure == DeliveryFailure.ReachQueueTimeout ||
                mqProp.DeliveryFailure == DeliveryFailure.ReceiveTimeout)
            {
                // Re-send.
                Console.WriteLine("Purchase order Time To Live expired");
                Console.WriteLine("Trying to resend the message");

                // Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
                orderProcessorService.SubmitPurchaseOrder(po);
                Console.WriteLine("Purchase order resent");
            }
        }

        // Host the service within this EXE console application.
        public static void Main()
        {
            // Create a ServiceHost for the PurchaseOrderDLQService type.
            using (ServiceHost serviceHost = new ServiceHost(typeof(PurchaseOrderDLQService)))
            {
                // Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open();

                // The service can now be accessed.
                Console.WriteLine("The dead letter service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                serviceHost.Close();
            }
        }
    }
}

Imports System.ServiceModel.Description
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.ServiceModel.Channels
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples
    ' Define a service contract. 
    <ServiceContract(Namespace:="http://Microsoft.ServiceModel.Samples")> _
    Public Interface IOrderProcessor
        <OperationContract(IsOneWay:=True)> _
        Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder)
    End Interface

    ' Service class that implements the service contract.
    ' Added code to write output to the console window
    <ServiceBehavior(InstanceContextMode:=InstanceContextMode.Single, ConcurrencyMode:=ConcurrencyMode.Single, AddressFilterMode:=AddressFilterMode.Any)> _
    Public Class PurchaseOrderDLQService
        Implements IOrderProcessor
        Private orderProcessorService As OrderProcessorClient
        Public Sub New()
            orderProcessorService = New OrderProcessorClient("OrderProcessorEndpoint")
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub SimpleSubmitPurchaseOrder(ByVal po As PurchaseOrder)
            Console.WriteLine("Submitting purchase order did not succeed ", po)
            Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
            Console.WriteLine()
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder) Implements IOrderProcessor.SubmitPurchaseOrder
            Console.WriteLine("Submitting purchase order did not succeed ", po)
            Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
            Console.WriteLine()

            ' Resend the message if timed out.
            If mqProp.DeliveryFailure = DeliveryFailure.ReachQueueTimeout OrElse mqProp.DeliveryFailure = DeliveryFailure.ReceiveTimeout Then
                ' Re-send.
                Console.WriteLine("Purchase order Time To Live expired")
                Console.WriteLine("Trying to resend the message")

                ' Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
                orderProcessorService.SubmitPurchaseOrder(po)
                Console.WriteLine("Purchase order resent")
            End If
        End Sub

        ' Host the service within this EXE console application.
        Public Shared Sub Main()
            ' Create a ServiceHost for the PurchaseOrderDLQService type.
            Using serviceHost As New ServiceHost(GetType(PurchaseOrderDLQService))
                ' Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open()

                ' The service can now be accessed.
                Console.WriteLine("The dead letter service is ready.")
                Console.WriteLine("Press <ENTER> to terminate service.")
                Console.WriteLine()
                Console.ReadLine()

                ' Close the ServiceHostBase to shutdown the service.
                serviceHost.Close()
            End Using
        End Sub
    End Class
End Namespace

В следующем примере показан код для файла конфигурации службы очереди недоставленных сообщений.

См. также