Unexpected UnAck'd Messages Piling up in OpenMQ using NMS and STOMP

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Unexpected UnAck'd Messages Piling up in OpenMQ using NMS and STOMP

Ted Squilanti
This post was updated on .
Hello All,

I have not found much information on integrating into OpenMQ using STOMP and NMS via c# and dotNet so I have been crawling along steadily.

Through the Apache.NMS and Apache.NMS.Stomp dll's I have created a system using topics instead of queues.

The system runs fine except for a minor annoyance.  As I produce messages, I see them end up as UnAck'd messages on OpenMQ (using imqcmd to list destinations).  I am guessing that due to the eventual piling up of these UnAck'd messages, I am seeing my STOMP client come tumbling down with a "java.lang.OutOfMemory: Java heap space" error.

When I first started creating this system, I used a more simplistic approach and wrote all of my STOMP commands directly to the STOMP client, creating the message header and message strings myself and sending directly via tcp.  Using this direct writing to STOMP, I did not see my messages being left in OpenMQ an UnAck'd messages and the STOMP client never ran out of memory.

I would go back to that direct writing via tcp but don't believe that we should be worrying about things down at the message frame level.

I have made sure that my session is set to auto-acknowledge and have tried making the messages non-persistent.  I want to create durable consumers but, for the moment have decided to back down to regular consumers until I fuguure this out.

I would appreciate any advice that anyone may have or even possibly the suggestion of a more appropriate forum (possibly more OpenMQ centric).

Thank you,
  Ted Squilanti

***Added on 2010_10_13

O.K.... Here's an example...

I have slightly modified the ActiveMQ Async NMS Example found at  https://cwiki.apache.org/NMS/examples.html

I have changed it from queue's to topic's and from activeMQ to stomp.

The modified code is below.

After compiling the code in VS2008, I:
1.  Set a breakpoint at the line with the comment // !!! Set Breakpoint Here !!!
2.  I start OpenMQ on port 7676
3.  I start STOMP on port 7672
4.  I let this program run in the debugger
5.  When it pauses at the breakpoint, I navigate to my OpenMQ mq/bin foder and type:
   imqcmd list dst -b localhost:7676 -u admin -passfile passfile.sample

The following is displayed:

***************************************
Listing all the destinations on the broker specified by:

-------------------------
Host             Primary Port
-------------------------
localhost       7676

-----------------------------------------------------------------------------------------
    Name    Type       State       Producers          Consumers                       Msgs
                                       Total  Wildcard   Total  Wildcard    Count  Remote  UnAck  Avg Size
-----------------------------------------------------------------------------------------
FOO.BAR    Topic   Running  1        0            1       0              1        0          1         340.0
mq.sys.dmq  Queue  Running  0        -            0       -              0        0          0         0.0

Sucessfully listed destinations.

***************************************

*** NOTICE THE UnAck'd MESSAGE ***.  In my real application these messages are generated quickly, pile up, and, as mentioned above, cause my STOMP client to come tumbling down with a "java.lang.OutOfMemory: Java heap space" error.

=== Code - Begin ===

using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;

namespace Apache.NMS.ActiveMQ.Test
{

public class TestMain
{

    protected static AutoResetEvent semaphore = new AutoResetEvent(false);
    protected static ITextMessage message = null;
    protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);

    public static void Main(string[] args)
    {

        // Uri connecturi = new Uri("activemq:tcp://activemqhost:61616");
        Uri connecturi = new Uri("stomp:tcp://localhost:7672");
       
        Console.WriteLine("About to connect to " + connecturi);

        IConnectionFactory factory = new NMSConnectionFactory(connecturi);

        using(IConnection connection = factory.CreateConnection())
        using(ISession session = connection.CreateSession())
        {
            // IDestination destination = SessionUtil.GetDestination(session, "queue://FOO.BAR");
            IDestination destination = SessionUtil.GetDestination(session, "topic://FOO.BAR");

            Console.WriteLine("Using destination: " + destination);

            // Create a consumer and producer
            using(IMessageConsumer consumer = session.CreateConsumer(destination))
            using(IMessageProducer producer = session.CreateProducer(destination))
            {
                // Start the connection so that messages will be processed.
                connection.Start();

                // producer.Persistent = true;
                producer.DeliveryMode = MsgDeliveryMode.Persistent;

                producer.RequestTimeout = receiveTimeout;

                consumer.Listener += new MessageListener(OnMessage);

                // Send a message
                ITextMessage request = session.CreateTextMessage("Hello World!");
                request.NMSCorrelationID = "abc";
                request.Properties["NMSXGroupID"] = "cheese";
                request.Properties["myHeader"] = "Cheddar";

                producer.Send(request);

                // Wait for the message
                semaphore.WaitOne((int) receiveTimeout.TotalMilliseconds, true);
                if(message == null)
                {
                    Console.WriteLine("No message received!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            } // !!! Set Breakpoint Here !!!
        }
    }

    protected static void OnMessage(IMessage receivedMsg)
    {
        message = receivedMsg as ITextMessage;
        Console.WriteLine("Reached OnMessage with message: " + message.Text);
        semaphore.Set();
    }
}
}

=== Code - End ===