使用死信队列处理消息传输故障

排队消息可能传送失败。 这些失败的消息将记录在死信队列中。 传送失败可能是由于网络故障、队列已删除、队列已满、身份验证失败或未能准时传送等原因而引起的。

如果接收应用程序未从队列中及时读取排队消息,则这些消息可能会在队列中长时间保留。 这种行为对于时效性较强的消息可能不合适。 时效较强的消息在排队绑定中设置了一个生存时间 (TTL) 属性,该属性指示在这些消息必须过期之前可以在队列中保留多长时间。 过期消息将发送到一个称为死信队列的特殊队列中。 也可以出于其他原因将这些消息放入死信队列,如超出队列配额或由于身份验证失败。

通常,应用程序会编写补偿逻辑从死信队列中读取这些消息及失败的原因。 补偿逻辑取决于失败的原因。 例如,当身份验证失败时,您可以更正附有消息的证书,然后重新发送该消息。 如果传送失败的原因在于已达到目标队列的配额,则您可以在认为配额问题已得到解决时重新尝试发送。

大多数队列系统都有系统范围的死信队列,其中存储来自该系统的所有失败消息。 消息队列 (MSMQ) 提供两个系统范围的死信队列:事务性系统范围死信队列,用于存储未能传送到事务性队列的消息;非事务性系统范围死信队列,用于存储未能传送到非事务性队列的消息。 如果两个客户端要向两个不同的服务发送消息,从而 WCF 中的不同队列将共享要发送的同一个 MSMQ 服务,则系统死信队列中可能会包含混合消息。 这并不总是最佳的。 在某些情况下(如安全),您可能不希望一个客户端从死信队列中读取另一个客户端的消息。 共享死信队列还要求客户端浏览该队列来查找这些客户端所发送的消息,但根据死信队列中的消息数量,这样做的开销可能会极其大。 因此,在 Windows Vista 上的 WCF NetMsmqBindingMsmqIntegrationBinding, 和 MSMQ 提供了一个自定义死信队列(有时称为应用程序特定的死信队列)。

自定义死信队列在共享同一个 MSMQ 服务来发送消息的各客户端之间提供隔离。

在 Windows Server 2003 和 Windows XP 上,Windows Communication Foundation (WCF) 为所有排队客户端应用程序提供系统范围的死信队列。 在 Windows Vista 上,WCF 为每个排队客户端应用程序都提供了一个死信队列。

指定死信队列的用途

死信队列位于发送应用程序的队列管理器中。 该队列中存储已过期或者传输/传送失败的消息。

该绑定具有下列死信队列属性:

从死信队列中读取消息

从死信队列中读取消息的应用程序类似于从应用程序队列中读取消息的 WCF 服务,但存在下列细微差异:

  • 若要从系统的事务性死信队列中读取消息,统一资源标识符 (URI) 必须为以下形式:net.msmq://localhost/system$;DeadXact。

  • 若要从系统的非事务性死信队列中读取消息,URI 必须为以下形式:net.msmq://localhost/system$;DeadLetter。

  • 要从自定义死信队列中读取消息,URI 必须为以下形式:net.msmq://localhost/private/<custom-dlq-name>,其中 custom-dlq-name 是自定义死信队列的名称。

有关如何对队列进行寻址的详细信息,请参阅服务终结点和队列寻址

接收方上的 WCF 堆栈将该服务正在侦听的各个地址与消息上的地址进行匹配。 如果地址匹配,则调度该消息;否则,不调度该消息。 在从死信队列中进行读取时,这种操作可能会引发问题,原因是死信队列中的消息通常将寻址到该服务而非死信队列服务。 因此,从死信队列中进行读取的服务必须安装一个地址筛选器 ServiceBehavior,指示堆栈独立于被寻地址来与队列中的所有消息匹配。 具体而言,您必须将一个带有 ServiceBehavior 参数的 Any 添加到从死信队列中读取消息的服务中。

死信队列中的病毒消息处理

病毒消息处理在死信队列中可用,但有一些条件。 因为在从系统的死信队列中进行读取时,您无法从系统队列创建子队列,所以不能将 ReceiveErrorHandling 设置为 Move。 请注意,如果您正在从自定义死信队列中读取,则可以拥有子队列,因此 Move 对于病毒消息为有效处理。

在将 ReceiveErrorHandling 设置为 Reject 以及从自定义死信队列中进行读取时,病毒消息将放入系统的死信队列。 如果从系统的死信队列中进行读取,则将丢弃(清除)该消息。 在 MSMQ 的系统死信队列中执行拒绝将丢弃(清除)该消息。

示例

下面的示例演示如何创建死信队列以及如何使用该队列来处理过期消息。 该示例基于如何:与 WCF 终结点交换排队消息中示例。 下面的示例演示如何将客户端代码写入将死信队列用于每个应用程序的订单处理服务。 该示例还显示如何处理死信队列中的消息。

下面是为每个应用程序指定死信队列的客户端的代码。

using System;
using System.ServiceModel.Channels;
using System.Configuration;
//using System.Messaging;
using System.ServiceModel;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    
    //The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    //Client implementation code.
    class Client
    {
        static void Main()
        {
            // Get MSMQ queue name from appsettings in configuration.
            string deadLetterQueueName = ConfigurationManager.AppSettings["deadLetterQueueName"];

            // Create the transacted MSMQ queue for storing dead message if necessary.
            if (!System.Messaging.MessageQueue.Exists(deadLetterQueueName))
                System.Messaging.MessageQueue.Create(deadLetterQueueName, true);

            OrderProcessorClient client = new OrderProcessorClient("OrderProcessorEndpoint");
        try
            {	

                // Create the purchase order.
                PurchaseOrder po = new PurchaseOrder();
                po.CustomerId = "somecustomer.com";
                po.PONumber = Guid.NewGuid().ToString();

                PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
                lineItem1.ProductId = "Blue Widget";
                lineItem1.Quantity = 54;
                lineItem1.UnitCost = 29.99F;

                PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
                lineItem2.ProductId = "Red Widget";
                lineItem2.Quantity = 890;
                lineItem2.UnitCost = 45.89F;

                po.orderLineItems = new PurchaseOrderLineItem[2];
                po.orderLineItems[0] = lineItem1;
                po.orderLineItems[1] = lineItem2;

                //Create a transaction scope.
                using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
                {
                    // Make a queued call to submit the purchase order.
                    client.SubmitPurchaseOrder(po);
                    // Complete the transaction.
                    scope.Complete();
                }

                client.Close();
            }
            catch(TimeoutException timeout)
            {
        Console.WriteLine(timeout.Message);
                client.Abort();
        }
            catch(CommunicationException conexcp)
            {
        Console.WriteLine(conexcp.Message);
                client.Abort();
        }

            Console.WriteLine();
            Console.WriteLine("Press <ENTER> to terminate client.");
            Console.ReadLine();
        }
    }
}
Imports System.ServiceModel.Channels
Imports System.Configuration
'using System.Messaging;
Imports System.ServiceModel
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples

    'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    'Client implementation code.
    Friend Class Client
        Shared Sub Main()
            ' Get MSMQ queue name from appsettings in configuration.
            Dim deadLetterQueueName As String = ConfigurationManager.AppSettings("deadLetterQueueName")

            ' Create the transacted MSMQ queue for storing dead message if necessary.
            If (Not System.Messaging.MessageQueue.Exists(deadLetterQueueName)) Then
                System.Messaging.MessageQueue.Create(deadLetterQueueName, True)
            End If


            Dim client As New OrderProcessorClient("OrderProcessorEndpoint")
            Try


                ' Create the purchase order.
                Dim po As New PurchaseOrder()
                po.CustomerId = "somecustomer.com"
                po.PONumber = Guid.NewGuid().ToString()

                Dim lineItem1 As New PurchaseOrderLineItem()
                lineItem1.ProductId = "Blue Widget"
                lineItem1.Quantity = 54
                lineItem1.UnitCost = 29.99F

                Dim lineItem2 As New PurchaseOrderLineItem()
                lineItem2.ProductId = "Red Widget"
                lineItem2.Quantity = 890
                lineItem2.UnitCost = 45.89F

                po.orderLineItems = New PurchaseOrderLineItem(1) {}
                po.orderLineItems(0) = lineItem1
                po.orderLineItems(1) = lineItem2

                'Create a transaction scope.
                Using scope As New TransactionScope(TransactionScopeOption.Required)
                    ' Make a queued call to submit the purchase order.
                    client.SubmitPurchaseOrder(po)
                    ' Complete the transaction.
                    scope.Complete()
                End Using


                client.Close()
            Catch timeout As TimeoutException
                Console.WriteLine(timeout.Message)
                client.Abort()
            Catch conexcp As CommunicationException
                Console.WriteLine(conexcp.Message)
                client.Abort()
            End Try

            Console.WriteLine()
            Console.WriteLine("Press <ENTER> to terminate client.")
            Console.ReadLine()
        End Sub
    End Class
End Namespace

下面是客户端配置文件的代码。

下面是死信队列中的服务处理消息的代码。

using System;
using System.ServiceModel.Description;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract.
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
    public interface IOrderProcessor
    {
        [OperationContract(IsOneWay = true)]
        void SubmitPurchaseOrder(PurchaseOrder po);
    }

    // Service class that implements the service contract.
    // Added code to write output to the console window
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single, AddressFilterMode = AddressFilterMode.Any)]
    public class PurchaseOrderDLQService : IOrderProcessor
    {
        OrderProcessorClient orderProcessorService;
        public PurchaseOrderDLQService()
        {
            orderProcessorService = new OrderProcessorClient("OrderProcessorEndpoint");
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SimpleSubmitPurchaseOrder(PurchaseOrder po)
        {
            Console.WriteLine("Submitting purchase order did not succeed ", po);
            MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
            Console.WriteLine();
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SubmitPurchaseOrder(PurchaseOrder po)
        {
            Console.WriteLine("Submitting purchase order did not succeed ", po);
            MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
            Console.WriteLine();

            // Resend the message if timed out.
            if (mqProp.DeliveryFailure == DeliveryFailure.ReachQueueTimeout ||
                mqProp.DeliveryFailure == DeliveryFailure.ReceiveTimeout)
            {
                // Re-send.
                Console.WriteLine("Purchase order Time To Live expired");
                Console.WriteLine("Trying to resend the message");

                // Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
                orderProcessorService.SubmitPurchaseOrder(po);
                Console.WriteLine("Purchase order resent");
            }
        }

        // Host the service within this EXE console application.
        public static void Main()
        {
            // Create a ServiceHost for the PurchaseOrderDLQService type.
            using (ServiceHost serviceHost = new ServiceHost(typeof(PurchaseOrderDLQService)))
            {
                // Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open();

                // The service can now be accessed.
                Console.WriteLine("The dead letter service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                serviceHost.Close();
            }
        }
    }
}

Imports System.ServiceModel.Description
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.ServiceModel.Channels
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples
    ' Define a service contract. 
    <ServiceContract(Namespace:="http://Microsoft.ServiceModel.Samples")> _
    Public Interface IOrderProcessor
        <OperationContract(IsOneWay:=True)> _
        Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder)
    End Interface

    ' Service class that implements the service contract.
    ' Added code to write output to the console window
    <ServiceBehavior(InstanceContextMode:=InstanceContextMode.Single, ConcurrencyMode:=ConcurrencyMode.Single, AddressFilterMode:=AddressFilterMode.Any)> _
    Public Class PurchaseOrderDLQService
        Implements IOrderProcessor
        Private orderProcessorService As OrderProcessorClient
        Public Sub New()
            orderProcessorService = New OrderProcessorClient("OrderProcessorEndpoint")
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub SimpleSubmitPurchaseOrder(ByVal po As PurchaseOrder)
            Console.WriteLine("Submitting purchase order did not succeed ", po)
            Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
            Console.WriteLine()
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder) Implements IOrderProcessor.SubmitPurchaseOrder
            Console.WriteLine("Submitting purchase order did not succeed ", po)
            Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
            Console.WriteLine()

            ' Resend the message if timed out.
            If mqProp.DeliveryFailure = DeliveryFailure.ReachQueueTimeout OrElse mqProp.DeliveryFailure = DeliveryFailure.ReceiveTimeout Then
                ' Re-send.
                Console.WriteLine("Purchase order Time To Live expired")
                Console.WriteLine("Trying to resend the message")

                ' Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
                orderProcessorService.SubmitPurchaseOrder(po)
                Console.WriteLine("Purchase order resent")
            End If
        End Sub

        ' Host the service within this EXE console application.
        Public Shared Sub Main()
            ' Create a ServiceHost for the PurchaseOrderDLQService type.
            Using serviceHost As New ServiceHost(GetType(PurchaseOrderDLQService))
                ' Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open()

                ' The service can now be accessed.
                Console.WriteLine("The dead letter service is ready.")
                Console.WriteLine("Press <ENTER> to terminate service.")
                Console.WriteLine()
                Console.ReadLine()

                ' Close the ServiceHostBase to shutdown the service.
                serviceHost.Close()
            End Using
        End Sub
    End Class
End Namespace

下面是死信队列服务配置文件的代码。

另请参阅