Grouping Queued Messages in a Session

Windows Communication Foundation (WCF) provides a session that allows you to group a set of related messages together for processing by a single receiving application. Messages that are part of a session must be part of the same transaction. Because all messages are part of the same transaction, if one message fails to be processed the entire session is rolled back. Sessions have similar behaviors with regard to dead-letter queues and poison queues. The Time to Live (TTL) property set on a queued binding configured for sessions is applied to the session as a whole. If only some of the messages in the session are sent before the TTL expires, the entire session is placed in the dead-letter queue. Similarly, when messages in a session fail to be sent to an application from the application queue, the entire session is placed in the poison queue (if available).

Message Grouping Example

One example where grouping messages is helpful is when implementing an order-processing application as a WCF service. For instance, a client submits an order to this application that contains a number of items. For each item, the client makes a call to the service, which results in a separate message being sent. It is possible for serve A to receive the first item, and server B to receive the second item. Each time an item is added, the server processing that item has to find the appropriate order and add the item to it, which is highly inefficient. You still run into such inefficiencies with only a single server handling all requests, because the server must keep track of all orders currently being processed and determine which one the new item belongs to. Grouping all requests for a single order greatly simplifies implementation of such an application. The client application sends all items for a single order in a session, so when the service processes the order, it processes the entire session at once. \

Procedures

To set up a service contract to use sessions

  1. Define a service contract that requires a session. Do this with the OperationContractAttribute attribute and by specifying:

    SessionMode=SessionMode.Required  
    
  2. Mark the operations in the contract as one-way, because these methods do not return anything. This is done with the OperationContractAttribute attribute and by specifying:

    [OperationContract(IsOneWay = true)]  
    
  3. Implement the service contract and specify an InstanceContextMode of PerSession. This instantiates the service only once for each session.

    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]  
    
  4. Each service operation requires a transaction. Specify this with the OperationBehaviorAttribute attribute. The operation that completes the transaction should also set TransactionAutoComplete to true.

    [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]   
    
  5. Configure an endpoint that uses the system-provided NetMsmqBinding binding.

  6. Create a transactional queue using System.Messaging. You can also create the queue by using Message Queuing (MSMQ) or MMC. If you do, create a transactional queue.

  7. Create a service host for the service by using ServiceHost.

  8. Open the service host to make the service available.

  9. Close the service host.

To set up a client

  1. Create a transaction scope to write to the transactional queue.

  2. Create the WCF client using the ServiceModel Metadata Utility Tool (Svcutil.exe) tool.

  3. Place the order.

  4. Close the WCF client.

Example

Description

The following example provides the code for the IProcessOrder service and for a client that uses this service. It shows how WCF uses queued sessions to provide the grouping behavior.

Code for the Service

// Service Code:

using System;

using System.ServiceModel.Channels;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;
using System.Text;
using System.Collections.Generic;

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract. 
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples", SessionMode=SessionMode.Required)]
    public interface IOrderTaker
    {
        [OperationContract(IsOneWay = true)]
        void OpenPurchaseOrder(string customerId);

        [OperationContract(IsOneWay = true)]
        void AddProductLineItem(string productId, int quantity);

        [OperationContract(IsOneWay = true)]
        void EndPurchaseOrder();
    }

    // Define the Purchase Order Line Item
    public class PurchaseOrderLineItem
    {
        static Random r = new Random(137);

        string ProductId;
        float UnitCost;
        int Quantity;

        public PurchaseOrderLineItem(string productId, int quantity)
        {
            this.ProductId = productId;
            this.Quantity = quantity;
            this.UnitCost = r.Next(10000);
        }

        public override string ToString()
        {
            String displayString = "Order LineItem: " + Quantity + " of " + ProductId + " @unit price: $" + UnitCost + "\n";
            return displayString;
        }

        public float TotalCost
        {
            get { return UnitCost * Quantity; }
        }
    }

    // Define Purchase Order
    public class PurchaseOrder
    {
        string PONumber;
        string CustomerId;
        LinkedList<PurchaseOrderLineItem> orderLineItems = new LinkedList<PurchaseOrderLineItem>();

        public PurchaseOrder(string customerId)
        {
            this.CustomerId = customerId;
            this.PONumber = Guid.NewGuid().ToString();
        }

        public void AddProductLineItem(string productId, int quantity)
        {
            orderLineItems.AddLast(new PurchaseOrderLineItem(productId, quantity));
        }

        public float TotalCost
        {
            get
            {
                float totalCost = 0;
                foreach (PurchaseOrderLineItem lineItem in orderLineItems)
                    totalCost += lineItem.TotalCost;
                return totalCost;
            }
        }

        public string Status
        {
            get
            {
                return "Pending";
            }
        }

        public override string ToString()
        {
            StringBuilder strbuf = new StringBuilder("Purchase Order: " + PONumber + "\n");
            strbuf.Append("\tCustomer: " + CustomerId + "\n");
            strbuf.Append("\tOrderDetails\n");

            foreach (PurchaseOrderLineItem lineItem in orderLineItems)
            {
                strbuf.Append("\t\t" + lineItem.ToString());
            }

            strbuf.Append("\tTotal cost of this order: $" + TotalCost + "\n");
            strbuf.Append("\tOrder status: " + Status + "\n");
            return strbuf.ToString();
        }
    }


    // Service class which implements the service contract.
    // Added code to write output to the console window
    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    public class OrderTakerService : IOrderTaker
    {
        PurchaseOrder po;

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
        public void OpenPurchaseOrder(string customerId)
        {
            Console.WriteLine("Creating purchase order");
            po = new PurchaseOrder(customerId);
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
        public void AddProductLineItem(string productId, int quantity)
        {
            po.AddProductLineItem(productId, quantity);
            Console.WriteLine("Product " + productId + " quantity " + quantity + " added to purchase order");
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void EndPurchaseOrder()
        {
            Console.WriteLine("Purchase Order Completed");
            Console.WriteLine();
            Console.WriteLine(po.ToString());
        }


        // Host the service within this EXE console application.
        public static void Main()
        {
            // Get MSMQ queue name from app settings in configuration
            string queueName = ConfigurationManager.AppSettings["queueName"];

            // Create the transacted MSMQ queue if necessary.
            if (!MessageQueue.Exists(queueName))
                MessageQueue.Create(queueName, true);


            // Get the base address that is used to listen for WS-MetaDataExchange requests
            string baseAddress = ConfigurationManager.AppSettings["baseAddress"];

            // Create a ServiceHost for the OrderTakerService type.
            using (ServiceHost serviceHost = new ServiceHost(typeof(OrderTakerService), new Uri(baseAddress)))
            {
                // Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open();

                // The service can now be accessed.
                Console.WriteLine("The service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                serviceHost.Close(); 
            }
        }
    }
}
' Service Code:

Imports System
Imports System.ServiceModel.Channels
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.Transactions
Imports System.Text
Imports System.Collections.Generic

Namespace Microsoft.ServiceModel.Samples
	' Define a service contract. 
	<ServiceContract(Namespace := "http://Microsoft.ServiceModel.Samples", SessionMode:=SessionMode.Required)> _
	Public Interface IOrderTaker
		<OperationContract(IsOneWay := True)> _
		Sub OpenPurchaseOrder(ByVal customerId As String)

		<OperationContract(IsOneWay := True)> _
		Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer)

		<OperationContract(IsOneWay := True)> _
		Sub EndPurchaseOrder()
	End Interface

	' Define the Purchase Order Line Item
	Public Class PurchaseOrderLineItem
		Private Shared r As New Random(137)

		Private ProductId As String
		Private UnitCost As Single
		Private Quantity As Integer

		Public Sub New(ByVal productId As String, ByVal quantity As Integer)
			Me.ProductId = productId
			Me.Quantity = quantity
			Me.UnitCost = r.Next(10000)
		End Sub

		Public Overrides Function ToString() As String
			Dim displayString As String = "Order LineItem: " & Quantity & " of " & ProductId & " @unit price: $" & UnitCost + Constants.vbLf
			Return displayString
		End Function

		Public ReadOnly Property TotalCost() As Single
			Get
				Return UnitCost * Quantity
			End Get
		End Property
	End Class

	' Define Purchase Order
	Public Class PurchaseOrder
		Private PONumber As String
		Private CustomerId As String
		Private orderLineItems As New LinkedList(Of PurchaseOrderLineItem)()

		Public Sub New(ByVal customerId As String)
			Me.CustomerId = customerId
			Me.PONumber = Guid.NewGuid().ToString()
		End Sub

		Public Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer)
			orderLineItems.AddLast(New PurchaseOrderLineItem(productId, quantity))
		End Sub

		Public ReadOnly Property TotalCost() As Single
			Get
                Dim totalCost_Renamed As Single = 0
                For Each lineItem In orderLineItems
                    totalCost_Renamed += lineItem.TotalCost
                Next lineItem
				Return totalCost_Renamed
			End Get
		End Property

		Public ReadOnly Property Status() As String
			Get
				Return "Pending"
			End Get
		End Property

		Public Overrides Function ToString() As String
			Dim strbuf As New StringBuilder("Purchase Order: " & PONumber & Constants.vbLf)
			strbuf.Append(Constants.vbTab & "Customer: " & CustomerId & Constants.vbLf)
			strbuf.Append(Constants.vbTab & "OrderDetails" & Constants.vbLf)

            For Each lineItem In orderLineItems
                strbuf.Append(Constants.vbTab + Constants.vbTab + lineItem.ToString())
            Next lineItem

			strbuf.Append(Constants.vbTab & "Total cost of this order: $" & TotalCost + Constants.vbLf)
			strbuf.Append(Constants.vbTab & "Order status: " & Status + Constants.vbLf)
			Return strbuf.ToString()
		End Function
	End Class


	' Service class which implements the service contract.
	' Added code to write output to the console window
	<ServiceBehavior(InstanceContextMode:=InstanceContextMode.PerSession)> _
	Public Class OrderTakerService
		Implements IOrderTaker
		Private po As PurchaseOrder

		<OperationBehavior(TransactionScopeRequired := True, TransactionAutoComplete := False)> _
		Public Sub OpenPurchaseOrder(ByVal customerId As String) Implements IOrderTaker.OpenPurchaseOrder
			Console.WriteLine("Creating purchase order")
			po = New PurchaseOrder(customerId)
		End Sub

		<OperationBehavior(TransactionScopeRequired := True, TransactionAutoComplete := False)> _
		Public Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer) Implements IOrderTaker.AddProductLineItem
			po.AddProductLineItem(productId, quantity)
			Console.WriteLine("Product " & productId & " quantity " & quantity & " added to purchase order")
		End Sub

		<OperationBehavior(TransactionScopeRequired := True, TransactionAutoComplete := True)> _
		Public Sub EndPurchaseOrder() Implements IOrderTaker.EndPurchaseOrder
			Console.WriteLine("Purchase Order Completed")
			Console.WriteLine()
			Console.WriteLine(po.ToString())
		End Sub


		' Host the service within this EXE console application.
		Public Shared Sub Main()
			' Get MSMQ queue name from app settings in configuration
			Dim queueName As String = ConfigurationManager.AppSettings("queueName")

			' Create the transacted MSMQ queue if necessary.
			If (Not MessageQueue.Exists(queueName)) Then
				MessageQueue.Create(queueName, True)
			End If


			' Get the base address that is used to listen for WS-MetaDataExchange requests
			Dim baseAddress As String = ConfigurationManager.AppSettings("baseAddress")

			' Create a ServiceHost for the OrderTakerService type.
			Using serviceHost As New ServiceHost(GetType(OrderTakerService), New Uri(baseAddress))
				' Open the ServiceHostBase to create listeners and start listening for messages.
				serviceHost.Open()

				' The service can now be accessed.
				Console.WriteLine("The service is ready.")
				Console.WriteLine("Press <ENTER> to terminate service.")
				Console.WriteLine()
				Console.ReadLine()

				' Close the ServiceHostBase to shutdown the service.
				serviceHost.Close()
			End Using
		End Sub
	End Class
End Namespace

Code for the Client

using System;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    //The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    //Client implementation code.
    class Client
    {
        static void Main()
        {
            //Create a transaction scope.
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            {
                // Create a proxy with given client endpoint configuration
                OrderTakerClient client = new OrderTakerClient("OrderTakerEndpoint");
                try
                {
                    // Open a purchase order
                    client.OpenPurchaseOrder("somecustomer.com");
                    Console.WriteLine("Purchase Order created");

                    // Add product line items
                    Console.WriteLine("Adding 10 quantities of blue widget");
                    client.AddProductLineItem("Blue Widget", 10);

                    Console.WriteLine("Adding 23 quantities of red widget");
                    client.AddProductLineItem("Red Widget", 23);

                    // Close the purchase order
                    Console.WriteLine("Closing the purchase order");
                    client.EndPurchaseOrder();
                    client.Close();
                }
                catch (CommunicationException ex)
                {
                    client.Abort();
                }
                // Complete the transaction.
                scope.Complete();
            }
            Console.WriteLine();
            Console.WriteLine("Press <ENTER> to terminate client.");
            Console.ReadLine();
        }
    }
}

Imports System
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples
	'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

	'Client implementation code.
	Friend Class Client
		Shared Sub Main()
			'Create a transaction scope.
			Using scope As New TransactionScope(TransactionScopeOption.Required)
				' Create a proxy with given client endpoint configuration
				Dim client As New OrderTakerClient("OrderTakerEndpoint")
				Try
					' Open a purchase order
					client.OpenPurchaseOrder("somecustomer.com")
					Console.WriteLine("Purchase Order created")

					' Add product line items
					Console.WriteLine("Adding 10 quantities of blue widget")
					client.AddProductLineItem("Blue Widget", 10)

					Console.WriteLine("Adding 23 quantities of red widget")
					client.AddProductLineItem("Red Widget", 23)

					' Close the purchase order
					Console.WriteLine("Closing the purchase order")
					client.EndPurchaseOrder()
					client.Close()
				Catch ex As CommunicationException
					client.Abort()
				End Try
				' Complete the transaction.
				scope.Complete()
			End Using
			Console.WriteLine()
			Console.WriteLine("Press <ENTER> to terminate client.")
			Console.ReadLine()
		End Sub
	End Class
End Namespace

See Also

Sessions and Queues
Queues Overview