Insights

Synchronous Request/Reply Service Implementation using MQ and XMS on .NET

January 7, 2011

Abstract

This article demonstrates a way of implementing synchronous services using both IBM Websphere MQ and XMS (.NET counterpart of JMS – java message service). Having implemented a distributed enterprise integration project in a heterogeneous environment with both Java and .NET components that need to communicate through an ESB (Enterprise Service Bus) for a large national health plan provider (Client), we have gained considerable experience using both MQ and XMS technologies on the .NET platform. Although JMS is a well established messaging and integration technology on the Java platform, its counterpart on .NET, XMS does not seem to be mature yet, and for most beginners it may be a daunting task to know and understand both platforms and be able to provide robust and high performance integration between the two. In this article, I show a very practical way of using both the direct WebSphere MQ classes for .NET and the higher level XMS API to implement the most common service design pattern: the request/reply pattern.

Both MQ and XMS inherently support asynchronous messaging, but also provide facilities to implement a synchronous request/reply pattern. The request/reply pattern is a simple message exchange mechanism also used in web services: client sends a request and waits until the service returns a reply, server keeps listening until it gets a request, handles the request, and makes sure a reply correlated to the request is returned whether the call was successful or unsuccessful. The MQ and XMS samples attached to this article can be used as a starting point to develop more complex services that can be called by JMS clients and clients that can invoke JMS services.

Prerequisites

IBM Websphere MQ 7.0.1 or above server
IBM Websphere MQ Explorer
.NET 3.0 or above
VS.NET 2008 or above

Supported Features

The XMS and MQ samples associated with this article provide the following major features:

Sync Service:

  • Creates a listener thread (supports multiple listener threads)
  • Listens on a given request queue
  • Receives request message
  • Provides template for handling a text message
  • Provides template for handling exceptions
  • Ensures a response message is put to queue in both success and failure cases
  • Correlates response message with request message
  • If the connection is dropped, reconnects to the queue after a given time interval
  • Stops listener thread safely when requested

Sync Client:
  • Sends a text message to the given queue
  • Waits for a response message to be returned using a correlation id
  • Blocks the caller until response is received
  • Supports retrials if there are queue errors
  • When the call is complete, closes all queue connections safely

The Queue Interactions to Implement Request/Reply Pattern

Queue Interactions

Although this is not the simplest possible design, it shows important points to create a robust request/reply behavior.

  • The polling occurs between Get(timeout) operations from the queue, meaning that the listener is actually notified immediately as soon as a message appears in the queue rather than waiting for the next poll.
  • Between Get(timeout) operations, the listener checks if a termination is requested. It is possible to just wait for message arrival infinitely without any timeout, but this would not be acceptable in a real life implementation.
  • The same design is used in client side too. In our implementation, the timeout is configurable. For short service calls below the polling interval, the polling should not even occur.

Using Queue Connection Objects Properly

Making a queue connection requires that multiple objects be instantiated, used, and disposed of properly. Using the MQ API, these are: MQQueueManager, MQQueue; using XMS API, these are: IConnectionFactory, IConnection, ISession, IDestination, IMessageConsumer, IMessageProducer. To hide this complexity, we’ll create helper objects that handle all related objects properly: MQConnection and XMSConnection. To get an idea of how these objects do what they are doing, you can refer to the Open() methods in the source code. Following is an extract from the Open () method implementation for the MQ API:

        public void Open(bool output)
        {
            try
            {
                mqQueueManager = CreateQueueManager();
                if (!mqQueueManager.IsConnected)
                    mqQueueManager.Connect();
                mqQueue = mqQueueManager.AccessQueue(queueName, 
                    output ? (MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING)
                    : (MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING)
                    );
            }
            catch (Exception ex)
            {
                Trace.WriteLine("Error opening MQ connection\r\n" + ex.ToString());
                EnsureClosed();
                throw;
            }
        }

It’s important to note that this connection helper object makes sure all wrapped connection objects are closed if an error is encountered during initialization or when the object itself is disposed.

Writing the Synchronous Listener for the Service

The listener object launches a dedicated thread and continuously polls the request queue to check if there is any incoming request message. When there is a connection error or the connection drops, an outer loop ensures that a new connection is established to the queue after a reconnect interval has elapsed.

When a message is received, it then calls a handler method. The handler method just processes the contents of the text request and creates a response text. It is possible to develop an application-specific protocol around the message contents, but in these examples we just use raw text messages.

In the case of a request handling exception, the listener ensures that at least an error message is put back to the response queue. If we don’t do this, the client will just keep waiting and eventually timeout.

Normally, the listener will always stay in the polling loop to check if there are any new messages to process in the queue. However, for a realistic listener implementation, we also provided safe termination functionality, so that we can stop the listener whenever we want.

The following code shows the main listening loop using the MQ API:

        private void InnerListenLoop()
        {
            using (mqRequestConnection = new MQConnection(queueManager, channel, requestConnection, 
                                                          requestQueue, userId, password))
            {
                mqRequestConnection.Open(false);
                while (!this.stopListenerEvent.WaitOne(0))
                {
                    MQMessage mqRequest = mqRequestConnection.Get(receiveTimeOut);

                    if (mqRequest != null && mqRequest.MessageLength > 0)
                    {
                        if (mqRequest.Format == MQC.MQFMT_STRING)
                        {
                            HandleRequest(mqRequest);
                        }
                    }
                }
            }
        }

This listener loop is wrapped by an outer reconnect loop that looks like the following:

        private void OnListen()
        {
            // Reconnect if there's any uncaught error that bubbled up to here.
            while (true)
            {
                try
                {
                    InnerListenLoop();
                    return;
                }
                catch (MQException mqEx)
                {
                    if (mqEx.Reason == MQC.MQRC_CONNECTION_BROKEN)
                    {
                        Trace.WriteLine(string.Format("Connection was broken.  Reconnecting in {0} ms", 
                                                      reconnectInterval));
                        Thread.Sleep(reconnectInterval);
                    }
                    else
                        throw;
                }
            }
        }

The outer loop ensures that the listener doesn’t stop as long as there is no unhandled exceptions bubbled up to this point. Under normal circumstances, the HandleRequest() will have already taken care of the handling of exceptions specific to the processing of the request. The main HandleRequest() method implementation using the MQ API is:

        private void HandleRequest(MQMessage mqRequest)
        {
            string request = mqRequest.ReadString(mqRequest.MessageLength);

            try
            {
                string response = HandleRequest(request);       // Design a request and response contract
                // Put a response back
                PutResponse(response, mqRequest.MessageId);
            }
            catch(Exception ex)
            { 
                // If there is any handling error, put an error response back
                string response = HandleException(ex);
                PutResponse(response, mqRequest.MessageId);      // Design a fault contract
            }
        }

This method basically performs four important tasks: extract the text message, dispatch the message to be processed, put response back, and handle processing exception. In the case of an exception, it also delegates the exception handling functionality to the HandleException() method. That method is responsible for just logging and returning a text to be put to the response message.

Following is the PutResponse() implementation using the MQ API:

        private void PutResponse(MQMessage mqResponse)
        {
            // Use request queue manager to reply back.
            using (MQQueue mqQueue = mqRequestConnection.MQQueueManager.AccessQueue(this.responseQueue, 
                                     MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING, this.queueManager, 
                                     userId, password))
            {
                mqQueue.Put(mqResponse);
            }
        }

The listener class is configured by settings in the app.config. You can therefore change the queue configuration, manage some additional settings, and run the test.

Both XMS and MQ listeners are structured in the same way. Although there are some minor differences, once you understand one, you’ll understand the other, too. This is possible because we have encapsulated all queue connection API interaction within our connection helper classes.

Writing the Synchronous Client

We have used a similar approach to develop the client side. There is a ServiceRequestor object that does the necessary steps to do a service call safely: connect/put/get/retry if necessary/close.

Here’s how the service requestor is used:

                MQServiceRequestor requestor = new MQServiceRequestor();
                try
                {
                    string response = requestor.SyncRequest(request);
                    Console.WriteLine(response);
                }
                catch(Exception ex)
                {
                    Console.WriteLine(string.Format("Error receiving reponse: {0}", 
                                                    ex.ToString()));
                }

The same class is called XMSServiceRequestor in the sample using the XMS API, and is used the same way.

The following code shows the SyncRequest() method implementation:

        public string SyncRequest(string message)
        {
            MQMessage mqRequest = CreateRequest();
            mqRequest.WriteString(message);
            MQMessage mqResponse = SyncRequest(mqRequest);
            return mqResponse.ReadString(mqResponse.MessageLength);
        }
        public void SendRequest(MQMessage mqMessage)
        {
            try
            {
                // Open queue manager
                using (mqRequestConnection = new MQConnection(queueManager, channel, requestConnection, 
                                                              requestQueue, userId, password))
                {
                    mqRequestConnection.Open(true);
                    // Open queue
                    mqRequestConnection.Put(mqMessage);
                }
            }
            catch(Exception ex)
            {
                Trace.TraceError(string.Format("Error occured while putting message to queue\r\n", 
                                               ex.ToString()));
                throw;
            }
        }

After a request is sent, the requester waits for the response message using the original message’s id to correlate with the reponse’s CorrelationId:

        public MQMessage ReceiveReply(byte[] messageId, int retryCount)
        {
            int retrials = 0;
            while (true)
            {
                try
                {
                    return ReceiveReply(messageId);
                }
                catch (MQException mqEx)
                {
                    if (retrials >= retryCount)
                    {
                        throw;
                    }

                    Thread.Sleep(this.reconnectInterval);
                    retrials++;
                }
            }
        }

It also implements a retry pattern to provide immunity against temporary queue disconnections.

Running the Samples

When you start debugging (F5) in VS.NET, it will launch both the client and the server applications.

Launching applications

To send a request, type some text in the command line after ‘Enter request:’. The client will put a message to the queue, wait for the service to process it, and display the response once it is received.

Sending request

The service just responds with a message that includes the original message that we sent, confirming that it actually received the request message.

Both the MQ and XMS samples will give the exact same result if there is no exception. Of course, if an exception is thrown, it will be specific to the API that was used.

Important Points to Consider When Implementing Request/Reply Pattern on MQ/XMS

All replies must be correlated by a message id so that the client can find the exact match for the response it’s waiting for on a common queue. It is also possible to create a temporary reply queue rather than using a dedicated message queue. Our XMS sample actually uses this technique:

textMessage.JMSReplyTo = xmsSession.CreateTemporaryQueue()

Since the request/reply pattern relies on the exchange of messages in the same logical transaction, those messages are actually transient in nature and should not persist in the queue. We achieve this by setting

 mqMessage.Persistence = MQC.MQPER_NOT_PERSISTENT

or

xmsMessage.JMSDeliveryMode = DeliveryMode.NonPersistent

It’s also a good idea to set an expiry time on messages to prevent congestions caused by accumulation of unprocessed messages in the queue.

Unknown request messages and messages that could not be processed can be forwarded to a DeadLetter queue which was not demonstrated in our sample code, but which is a simple enhancement.

Finally, creating a protocol around the text message content would be a good idea to be able to receive multiple different types of requests and dispatch them to corresponding service handlers.

Conclusion

It is possible to implement request/reply pattern using the XMS and MQ APIs. This may be a quick and easy way to achieve communication between .NET and JMS or other platforms that may be using MQ. The provided listener code can be used as a starting point to develop robust services that work on MQ.

Our samples demonstrate that we can encapsulate most of the API-specific code and separate the listener code. By employing a provider model, it might be possible to entirely abstract the queue access functionality and use a single listener code for both types of APIs.

Download the source code for the article.

of |