Receive Polling-based Data-changed Messages in Oracle Database using the WCF Channel Model

You can configure the Microsoft BizTalk Adapter for Oracle Database to poll an Oracle database table or view for any data changes. To perform such a polling operation, the adapter periodically executes a SQL query against an Oracle table or view followed by an optional PL/SQL code block. The results of the SQL query are then returned by the Oracle Database adapter to your code as a strongly-typed result set in an inbound POLLINGSTMT operation. For more information about the mechanism used to configure and perform polling on an Oracle database using the Oracle Database adapter, see Receive polling-based data-changed messages in Oracle Database adapter. It is strongly recommended that you read this topic before proceeding.

You configure the Oracle Database adapter to poll and Oracle database table or view by setting binding properties on an instance of OracleDBBinding. In the WCF channel model, you then use this binding to build a channel listener from which you can get an IInputChannel channel to receive the POLLINGSTMT operation from the adapter.

For an overview of how to receive operations using an IInputChannel in WCF, see Service Channel-Level Programming.

The sections in this topic provide information to help you perform polling on Oracle database tables and views using the WCF channel model.

Consuming the POLLINGSTMT request message

The adapter invokes the POLLINGSTMT operation on your code to poll the Oracle database. That is, the adapter sends a POLLINGSTMT request message that you receive over an IInputChannel channel shape. The POLLINGSTMT request message contains the result set of the query specified by the PollingStatement binding property. You can consume the POLLINGSTMT message in one of two ways:

  • To consume the message using node-value streaming you must call the WriteBodyContents method on the response message and pass it an XmlDictionaryWriter that implements node-value streaming.

  • To consume the message using node streaming you can call GetReaderAtBodyContents on the response message to get an XmlReader.

    You typically use node-value streaming to consume result sets that contain Oracle LOB data columns.

    For more information about the message structure of the POLLINGSTMT operation, see Message Schemas for the Polling Operations.

    For more information about how the Oracle Database adapter supports streaming on LOB data, see Streaming large object data types in Oracle Database adapter.

    For more information about implementing node-value streaming in your code to support end-to-end streaming of LOB data, see Streaming Oracle Database LOB Data Types Using the WCF Channel Model.

About the Examples Used in this Topic

The example in this topic uses the SCOTT.ACCOUNTACTIVITY table and the SCOTT.ACCOUNT_PKG.PROCESS_ACTIVITY function. A script to generate these artifacts is supplied with the samples. The example performs the following operations:

  • As part of the polling statement, selects all the records from the ACCOUNTACTIVITY table and displays on the console.

  • As part of the post poll statement, the example invokes the PROCESS_ACTIVITY function that moves all the records from ACCOUNTACTIVITY table to ACTIVITYHISTORY table.

  • Subsequent polls on the ACCOUNTACTIVITY table do not return any records. However, if you want the example to return more records as part of the polling operation, you must insert some records in the ACCOUNTACTIVITY table. You can do so by running the more_activity_data.sql script provided with the samples.

    For more information about the samples, see Adapter Samples.

How Do I Poll an Oracle Database Using an IInputChannel?

To poll an Oracle database table or view to receive data-change messages using the WCF channel model, perform the following steps.

To receive data-changed messages using an IInputChannel

  1. Create a Visual C# project in Visual Studio. For this topic, create a console application.

  2. In the Solution Explorer, add reference to Microsoft.Adapters.OracleDB, Microsoft.ServiceModel.Channels, System.ServiceModel, and System.Runtime.Serialization.

  3. Open the Program.cs file and add the following namespaces:

    • Microsoft.Adapters.OracleDB

    • Microsoft.ServiceModel.Channels

    • System.ServiceModel

    • System.ServiceModel.Description

    • System.ServiceModel.Channels

    • System.Xml

    • System.Runtime.Serialization

    • System.IO

    • Microsoft.ServiceModel.Channels.Common

  4. Create an instance of OracleDBBinding and set the binding properties required to configure polling. At a minimum you must set the InboundOperationType, PollingStatement, and PollingInterval binding properties. For this example, you also set the PostPollStatement binding property. For more information about binding properties used to configure polling, see Receive polling-based data-changed messages in Oracle Database adapter.

    OracleDBBinding binding = new OracleDBBinding();  
    binding.InboundOperationType = InboundOperation.Polling;  
    binding.PollingInterval = 30;  
    binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
    binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"  
    
  5. Create a binding parameter collection and set the credentials.

    ClientCredentials credentials = new ClientCredentials();  
    credentials.UserName.UserName = "SCOTT";  
    credentials.UserName.Password = "TIGER";  
    
    BindingParameterCollection bindingParams = new BindingParameterCollection();  
    bindingParams.Add(credentials);  
    
  6. Create a channel listener and open it. You create the listener by invoking BuildChannelListener<IInputChannel> method on the OracleDBBinding. You can modify the target namespace for the POLLINGSTMT operation by setting the PollingId property in the connection URI. For more information about the adapter connection URI, see Create the Oracle Database connection URI.

    IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);  
    listener.Open();  
    
  7. Get an IInputChannel channel by invoking the AcceptChannel method on the listener and open it.

    IInputChannel channel = listener.AcceptChannel();  
    channel.Open();  
    
  8. Invoke Receive on the channel to get the next POLLINGSTMT message from the adapter.

    Message message = channel.Receive();  
    
  9. Consume the result set returned by the POLLINGSTMT operation. You can consume the message using either an XmlReader or an XmlDictionaryWriter.

    XmlReader reader = message.GetReaderAtBodyContents();  
    
  10. Close the channel when you have completed processing the request.

    channel.Close()  
    

    Important

    You must close the channel after you have finished processing the POLLINGSTMT operation. Failure to close the channel may affect the behavior of your code.

  11. Close the listener when you are finished receiving data-changed messages.

    listener.Close()  
    

    Important

    Closing the listener does not close channels created using the listener. You must explicitly close each channel created using the listener.

Example

The following example shows how to configure the Oracle Database adapter to poll Oracle database tables and views and receive the POLLLINGSTMT operation using the WCF channel model. The result set returned in the POLLINGSTMT operation is written to the console by using an XmlReader.

using System;  
using System.Collections.Generic;  
using System.Text;  

// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces  
using System.ServiceModel;  
using System.ServiceModel.Description;  
using Microsoft.ServiceModel.Channels;  
using Microsoft.Adapters.OracleDB;  

// Add this namespace for channel model  
using System.ServiceModel.Channels;  

using System.Xml;  
using System.Runtime.Serialization;  
using System.IO;  

// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions  
using Microsoft.ServiceModel.Channels.Common;  

namespace OraclePollingCM  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            Uri connectionUri = new Uri("oracleDB://ADAPTER/");  

            IChannelListener<IInputChannel> listener = null;  
            IInputChannel channel = null;  

            // set timeout to receive POLLINGSTMT message  
            TimeSpan messageTimeout = new TimeSpan(0, 0, 30);  

            Console.WriteLine("Sample Started");  

            try  
            {  
                // Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the           
                // PollingStatement,and the PostPollStatement.  
                OracleDBBinding binding = new OracleDBBinding();  
                binding.InboundOperationType = InboundOperation.Polling;  
                binding.PollingInterval = 30;  
                binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
                binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  

                // Create a binding parameter collection and set the credentials  
                ClientCredentials credentials = new ClientCredentials();  
                credentials.UserName.UserName = "SCOTT";  
                credentials.UserName.Password = "TIGER";  

                BindingParameterCollection bindingParams = new BindingParameterCollection();  
                bindingParams.Add(credentials);  

                Console.WriteLine("Opening listener");  
                // get a listener  from the binding  
                listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);  
                listener.Open();  

                Console.WriteLine("Opening channel");  
                // get a channel from the listener  
                channel = listener.AcceptChannel();  
                channel.Open();  

                Console.WriteLine("Channel opened -- waiting for polled data");  
                Console.WriteLine("Receive request timeout is {0}", messageTimeout);  

                // Poll five times with the specified message timeout   
                // If a timeout occurs polling will be aborted  
                for (int i = 0; i < 5; i++)  
                {  
                    Console.WriteLine("Polling: " + i);  
                    Message message = null;  
                    XmlReader reader = null;  
                    try  
                    {  
                        //Message is received so process the results  
                        message = channel.Receive(messageTimeout);  
                    }  
                    catch (System.TimeoutException toEx)  
                    {  
                        Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);  
                        continue;  
                    }  

                    // Get the query results using an XML reader  
                    try  
                    {  
                        reader = message.GetReaderAtBodyContents();  
                    }  
                    catch (Exception ex)  
                    {  
                        Console.WriteLine("Exception :" + ex);  
                        throw;  
                    }  

                    // Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console  
                    Console.WriteLine("\nPolling data received for request number {0}", i+1);  
                    Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");  

                    while (reader.Read())  
                    {  
                        if (reader.IsStartElement())  
                        {  
                            switch (reader.Name)  
                            {  
                                case "POLLINGSTMTRECORD":  
                                    Console.Write("\n");  
                                    break;  

                                case "TID":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  

                                case "ACCOUNT":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
                                case "AMOUNT":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  

                                case "TRANSDATE":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  

                                default:  
                                    break;  
                            }  
                        }  
                    }  

                    // return the cursor  
                    Console.WriteLine();  

                    // close the reader  
                    reader.Close();  

                    //            To save the polling data to a file you can REPLACE the code above with the following  
                    //  
                    //            XmlDocument doc = new XmlDocument();  
                    //            doc.Load(reader);  
                    //            using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))  
                    //            {  
                    //                doc.WriteTo(writer);  
                    //            }  
                    message.Close();  
                }  

                Console.WriteLine("\nPolling done -- hit <RETURN> to finish");  
                Console.ReadLine();  
            }  
            catch (TargetSystemException tex)  
            {  
                Console.WriteLine("Exception occurred on the Oracle Database");  
                Console.WriteLine(tex.InnerException.Message);  
            }  
            catch (ConnectionException cex)  
            {  
                Console.WriteLine("Exception occurred connecting to the Oracle Database");  
                Console.WriteLine(cex.InnerException.Message);  
            }  
            catch (Exception ex)  
            {  
                Console.WriteLine("Exception is: " + ex.Message);  
                if (ex.InnerException != null)  
                {  
                    Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);  
                }  
            }  
            finally  
            {  
                // IMPORTANT: close the channel and listener to stop polling  
                if (channel != null)  
                {  
                    if (channel.State == CommunicationState.Opened)  
                        channel.Close();  
                    else  
                        channel.Abort();  
                }  

                if (listener != null)  
                {  
                    if (listener.State == CommunicationState.Opened)  
                        listener.Close();  
                    else  
                        listener.Abort();  
                }  
            }  
        }  
    }  
}  

See Also

Develop Oracle Database applications using the WCF Channel Model