Consumer not able to consume messages from queue

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Consumer not able to consume messages from queue

xabhi
This post was updated on .
Hi,
I have a session aware message listener inside a DMLC with 5 concurrent consumers consuming messages from a queue. This listener will send a message everytime it receives a messages using the same session object to another topic by creating a producer.

This setup runs fine for around 15 days and then suddenly the consumers stop consuming messages from the queue and this continues until the broker is restarted.

I think my issue may be related to this http://activemq.2283324.n4.nabble.com/ActiveMQ-message-dequeuing-hangs-td4681366.html but I am not sure.

Where should I start looking for the possible issues?

I am using spring v4.0.5 and ActiveMQ 5.10 and activemq's pooled connection factory

Thanks,
Abhi
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

artnaseef
I would look at a stack trace inside the application - try to find the consumer threads and see if they are active, and if-so, what they are doing.  If all is normal, they should be waiting to receive a message from the ActiveMQ transport.

Also, check the broker logs for messages.  Anything related to producer flow control helps.

Does a restart of the consumer application correct the problem, or is a broker restart absolutely necessary?
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

xabhi
This post was updated on .
Thanks for the reply. I don't have the stack trace currently with me and I have not been able to reproduce this issue.
In my broker I log hourly destination stats and for this particular topic I observed that the messages consumption doesn't stop immediately rather it happens overtime.

[destName: queue://app.heartBeat | enqueueCount: 160014 | dequeueCount: 160014 | dispatchCount: 160014 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 0 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 134217728 | avgEnqueueTimeMs: 23540.421475620882 | maxEnqueueTimeMs: 3159190 | minEnqueueTimeMs: 1 | currentConsumers: 10 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 1033 | maxMsgSize: 1074 | avgMsgSize: 1057.1182958991087 | totalMsgSize: 169153727] – all messages consumed till now.

[destName: queue://app.heartBeat | enqueueCount: 160408 | dequeueCount: 160274 | dispatchCount: 160274 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 134 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 134217728 | avgEnqueueTimeMs: 23502.23844790796 | maxEnqueueTimeMs: 3159190 | minEnqueueTimeMs: 1 | currentConsumers: 10 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 1033 | maxMsgSize: 1074 | avgMsgSize: 1057.1288090369558 | totalMsgSize: 169571918] – messages consumed 260/394, held - 134

[destName: queue://app.heartBeat | enqueueCount: 160697 | dequeueCount: 160462 | dispatchCount: 160462 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 235 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 134217728 | avgEnqueueTimeMs: 24854.09577345415 | maxEnqueueTimeMs: 3159190 | minEnqueueTimeMs: 1 | currentConsumers: 10 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 1033 | maxMsgSize: 1074 | avgMsgSize: 1057.1415956738458 | totalMsgSize: 169879483] – messages consumed 188/289

[destName: queue://app.heartBeat | enqueueCount: 160979 | dequeueCount: 160462 | dispatchCount: 160462 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 517 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 134217728 | avgEnqueueTimeMs: 24854.09577345415 | maxEnqueueTimeMs: 3159190 | minEnqueueTimeMs: 1 | currentConsumers: 10 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 1033 | maxMsgSize: 1074 | avgMsgSize: 1057.1541691773461 | totalMsgSize: 170179621] – messages consumed 0/282

After this no messages are consumed. Another thing is that the usual hourly heartbeat message rate is 492 messages/hour but it gradually decreases during this time to 282.

Does this behavior points in some direction I should look for?

In my producer, I create a consumer and wait for the acknowledgement using consumer.receive() call with 60s timeout and at listener side I send back the acknowledgement for heartbeat by creating a producer using the same underlying session that I get in session aware listener. I am using selectors for this kind heartbeat messaging and acknowledgement (with persistent messages).

I have one question, The producer closes the session on its side under which it sent the heartbeat message once it fails to receive an acknowledgement (may be due to timeout), what will be the behavior of session aware listener on consumer side (which will use same session to send back the ack)?

As for the restart - I never tried client app restart as it takes around 15-20 mins to restart the client app and broker restart does the trick.

Thanks,
Abhi
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

xabhi
Please help i am not able to get anywhere with this issue.

Atleast tell me what more information to collect and what things to look out for next time this issue happens which can help to resolve/debug this issue.

Thanks,
Abhi
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

Tim Bain
You say you're doing request-response with a 60-second timeout on the
response.  If some of those responses took longer than 60 seconds, I'd
expect that that particular response message wouldn't be removed from the
response queue, and after some number of them I'd expect that the response
queue would be full of messages that were never going to be consumed and
producer flow control would kick in for the response queue.  Depending on
how you've implemented your producer, this might or might not prevent it
from publishing any more requests.  At a minimum, you should make sure
you've thought about what should happen to messages that are not consumed
due to the 60-second timeout, even if this doesn't turn out to be the root
cause for your problem...

Do you have any messages in the broker logs about producer flow control?
And do you have any way to find out how many messages were in the response
queue at the time this all went down?

Also, are you sure you're closing all of your consumers properly?
Selectors are expensive for a broker to evaluate, so if it's still
evaluating whether to deliver each message to every single consumer that's
ever connected, that might explain the slowdown you see (though it doesn't
explain why it completely stops processing data, so this may not be it).

On Mon, Jan 12, 2015 at 4:03 AM, xabhi <[hidden email]> wrote:

> Please help i am not able to get anywhere with this issue.
>
> Atleast tell me what more information to collect and what things to look
> out
> for next time this issue happens which can help to resolve/debug this
> issue.
>
> Thanks,
> Abhi
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689783.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

artnaseef
Good point - when using this type of selector scenario, using message TTLs or some other means of making sure messages are consumed is important.

As mentioned earlier, grabbing a stack trace may be telling, although it may be a little more difficult to understand since the DMLC is involved.

Impacts of closing the session that was created by the DMLC is a good question as well.  Unless there is some documentation from the Spring DMLC that shows it is safe to do, or the code has been read to ensure so, I would avoid doing so.  Also, closing the session seems odd to me -- if there is a problem with the connection, closing the connection would be necessary and closing the session would be unlikely to fix it.  If there is a problem within the single session, that is odd.

One thing to note - having producers and consumers sharing the same connection can lead to a deadlock when producer-flow-control kicks in, depending on the messaging pattern of the clients involved.
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

xabhi
Thanks for the reply.

I have disabled producer flow control on both topics and queues in my broker configuration and I have message TTL specified on broker side.

The destination on which heartbeats are sent is a Queue on which 5 concurrent consumers are listening and the reply to destination is a topic on which there are multiple subscribers.

When all this went down the topic destination enqueue count was not increasing.

I am pasting the code for consumer and producer:

Producer:

public boolean publish()
{
    String message = "Heartbeat message";
    boolean responseReceived = false;

    Connection connection = null;
    Session session = null;

    try
    {
      connection = myJmsTemplate.getConnectionFactory().createConnection();
      session    = connection.createSession(transacted, ackMode);

      String correlationId = null;
      Long   timeStamp     = System.currentTimeMillis();
      Random random        = new Random(timeStamp);

      Integer randomPart = random.nextInt(Integer.MAX_VALUE);
      Long    threadId   = Thread.currentThread().getId();
      correlationId      = threadId + "_" + timeStamp + "_" + randomPart;

      String messageSelector = "JMSCorrelationID='" + correlationId + "'";
      MessageConsumer responseConsumer = session.createConsumer(receiveDestination, messageSelector);
      connection.start();

      // send a text message to broker
      myJmsTemplate.send(sendDestination, new SimpleTextMessageCreator(message, receiveDestination, correlationId));

      LOG.debug("Waiting for message with " + messageSelector + " for " + DEFAULT_TIMEOUT + " ms");

      // check for response from broker, DEFAULT_TIMEOUT is 60s.
     TextMessage responseMessage = (TextMessage) responseConsumer.receive(DEFAULT_TIMEOUT);
     if (responseMessage != null)
     {
         if (!responseMessage.getJMSCorrelationID().equals(correlationId)) {
             String errorMsg =
                 "Invalid correlation id in response message!!! " +
                 "Expected : " + correlationId +
                 " but received : " + responseMessage.getJMSCorrelationID();

             LOG.error(errorMsg);
             responseReceived = false;
         }
         else {
             LOG.debug("Recieved the response back: " + responseMessage.getText());
             LOG.debug("Correlation id of response message : " + responseMessage.getJMSCorrelationID());
             responseReceived = true;
         }
     }
    }
    catch (JMSException e)
    {
     LOG.error("Error interacting with broker", e);
    }
    catch (Throwable t) {
     LOG.warn("Publish encountered unknown exception.", t);
    }
    finally {
     JmsUtil.closeConnection(connection, session, this.getClass().getName());
    }
    return responseReceived;
}


Listener/Consumer:

public class HeartBeatListener implements SessionAwareMessageListener
{

     private final Log LOG = LogFactory.getLog(this.getClass());

     @Override
     public void onMessage(Message message, Session session) throws JMSException
     {
         if (!(message instanceof TextMessage)) {
             throw new IllegalArgumentException("Message must be of type TextMessage: " + message);
         }

         String replyTextMessage = "Heartbeat Ack.";

         try
         {
             TextMessage textMessage = (TextMessage) message;
             String msg = textMessage.getText();

             LOG.debug("Received heart beat message : " + msg);

             // Send the response to the destination specified by the
             // 'JMSReplyTo' field of the received message.
             Destination responseDest = message.getJMSReplyTo();
             if (responseDest != null)
             {
                 LOG.debug("Sending response to destination" + responseDest.toString());

                 // Setup a message producer for the above destination
                 MessageProducer producer = session.createProducer(responseDest);
                 TextMessage responseMessage = session.createTextMessage(replyTextMessage);

                 responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());

                 // Send the response back
                 producer.send(responseMessage);
                 LOG.debug("Heart Beat Response Sent: " + responseMessage);
             }
         }
         catch (JMSException e)
         {
             LOG.error("Error while processing the message " + message, e);
         }
     }
}
This listener is used in a DMLC.

Hi art,
Here the session is created on the producer side (in the publish() function) and on listener side a session aware listener is used. My question was that what if the session on publisher side is closed before the session aware listener is even called or is in process of executing onMessage()?

Please tell me if I am doing anything wrong here?

Thanks for all the help.
-Abhi
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

Tim Bain
I can't speak to the question about closing the session (I use Camel to
interact with ActiveMQ, so I don't directly deal with the session), but
even if that explains the increasing msgsHeld value, it doesn't explain the
producer slowing down and eventually stopping its sends.  So there might be
multiple problems here.

What code calls your publish() method periodically?  And can you find out
(e.g. by taking the thread dump Art referenced) whether you're blocked on
the send() call or somewhere else when it all comes to a halt?

Also, I was under the impression that it wasn't possible to set message TTL
on the broker and that it could only be set by the producer; can you
provide a reference to the documentation for the setting you're using?
On Jan 13, 2015 1:03 AM, "xabhi" <[hidden email]> wrote:

> Thanks for the reply.
>
> I have disabled producer flow control on both topics and queues in my
> broker
> configuration and I have message TTL specified on broker side.
>
> The destination on which heartbeats are sent is a Queue on which 5
> concurrent consumers are listening and the reply to destination is a topic
> on which there are multiple subscribers.
>
> When all this went down the topic destination enqueue count was not
> increasing.
>
> I am pasting the code for consumer and producer:
>
> *Producer:*
>
> public boolean publish()
> {
>     String message = "Heartbeat message";
>     boolean responseReceived = false;
>
>     Connection connection = null;
>     Session session = null;
>
>     try
>     {
>       connection = myJmsTemplate.getConnectionFactory().createConnection();
>       session    = connection.createSession(transacted, ackMode);
>
>       String correlationId = null;
>       Long   timeStamp     = System.currentTimeMillis();
>       Random random        = new Random(timeStamp);
>
>       Integer randomPart = random.nextInt(Integer.MAX_VALUE);
>       Long    threadId   = Thread.currentThread().getId();
>       correlationId      = threadId + "_" + timeStamp + "_" + randomPart;
>
>       String messageSelector = "JMSCorrelationID='" + correlationId + "'";
>       MessageConsumer responseConsumer =
> session.createConsumer(receiveDestination, messageSelector);
>       connection.start();
>
>       // send a text message to broker
>       myJmsTemplate.send(sendDestination, new
> SimpleTextMessageCreator(message, receiveDestination, correlationId));
>
>       LOG.debug("Waiting for message with " + messageSelector + " for " +
> DEFAULT_TIMEOUT + " ms");
>
>       // check for response from broker, DEFAULT_TIMEOUT is 60s.
>      TextMessage responseMessage = (TextMessage)
> responseConsumer.receive(DEFAULT_TIMEOUT);
>      if (responseMessage != null)
>      {
>          if (!responseMessage.getJMSCorrelationID().equals(correlationId))
> {
>              String errorMsg =
>                  "Invalid correlation id in response message!!! " +
>                  "Expected : " + correlationId +
>                  " but received : " +
> responseMessage.getJMSCorrelationID();
>
>              LOG.error(errorMsg);
>              responseReceived = false;
>          }
>          else {
>              LOG.debug("Recieved the response back: " +
> responseMessage.getText());
>              LOG.debug("Correlation id of response message : " +
> responseMessage.getJMSCorrelationID());
>              responseReceived = true;
>          }
>      }
>     }
>     catch (JMSException e)
>     {
>      LOG.error("Error interacting with broker", e);
>     }
>     catch (Throwable t) {
>      LOG.warn("Publish encountered unknown exception.", t);
>     }
>     finally {
>      JmsUtil.closeConnection(connection, session,
> this.getClass().getName());
>     }
>     return responseReceived;
> }
>
>
> *Listener/Consumer:*
>
> public class HeartBeatListener implements SessionAwareMessageListener
> {
>
>      private final Log LOG = LogFactory.getLog(this.getClass());
>
>      @Override
>      public void onMessage(Message message, Session session) throws
> JMSException
>      {
>          if (!(message instanceof TextMessage)) {
>              throw new IllegalArgumentException("Message must be of type
> TextMessage: " + message);
>          }
>
>          String replyTextMessage = "Heartbeat Ack.";
>
>          try
>          {
>              TextMessage textMessage = (TextMessage) message;
>              String msg = textMessage.getText();
>
>              LOG.debug("Received heart beat message : " + msg);
>
>              // Send the response to the destination specified by the
>              // 'JMSReplyTo' field of the received message.
>              Destination responseDest = message.getJMSReplyTo();
>              if (responseDest != null)
>              {
>                  LOG.debug("Sending response to destination" +
> responseDest.toString());
>
>                  // Setup a message producer for the above destination
>                  MessageProducer producer =
> session.createProducer(responseDest);
>                  TextMessage responseMessage =
> session.createTextMessage(replyTextMessage);
>
>
> responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
>
>                  // Send the response back
>                  producer.send(responseMessage);
>                  LOG.debug("Heart Beat Response Sent: " + responseMessage);
>              }
>          }
>          catch (JMSException e)
>          {
>              LOG.error("Error while processing the message " + message, e);
>          }
>      }
> }
> This listener is used in a DMLC.
>
> Hi art,
> Here the session is created on the producer side (in the publish()
> function)
> and on listener side a session aware listener is used. My question was that
> what if the session on publisher side is closed before the session aware
> listener is even called or is in process of executing onMessage()?
>
> Please tell me if I am doing anything wrong here?
>
> Thanks for all the help.
> -Abhi
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689835.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

xabhi
This post was updated on .
The producer is not blocked as the msgHeld value keeps increasing, its the consumer that stops consuming messages.

>>Also, I was under the impression that it wasn't possible to set message TTL
on the broker and that it could only be set by the producer;

Yes, sorry about that it can be set by producer only, but you can do that by overriding send method of BrokerPluginSupport class to set message TTL when broker receives a message (which we do in cases where producer hasn't specified any message TTL value).

<amq:plugins>
    <amq:discardingDLQBrokerPlugin dropAll="true"/>

    <amq:loggingBrokerPlugin logAll="false" logMessageEvents="false"
        logConnectionEvents="true" logConsumerEvents="false"
        logProducerEvents="false"/>

// This is the modified plugin extending org.apache.activemq.broker.BrokerPluginSupport.
    <bean id="ActiveMQPlugin"
        class="amq.tools.ActiveMQPlugin">
    </bean>
</amq:plugins>

Thanks,
Abhi
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

Tim Bain
Mmm, I misunderstood what you'd written about where the bottleneck was
occurring.  Based on what you've said, it sounds like maybe your cursor is
getting full of messages for which no consumer exists (maybe because
they're in a session that was already closed?) and therefore the broker
can't pull out any of the messages matching the selectors of consumers that
are actually connected.  That's pure speculation on my part, but it might
be what's going on.  But it can only explain the problem if message TTL
wasn't being set (or was being set to a very large value, such that your
messages weren't expiring fast enough).  Can you browse the queue and
inspect the messages on it and confirm that the JMSExpiration header really
is set and that it's set to a value that's what you expect?  If it's set,
then my guess is probably wrong, but if it's not, then fix the code that's
trying to set it and see if this problem disappears...

On Tue, Jan 13, 2015 at 10:08 AM, xabhi <[hidden email]> wrote:

> The producer is not blocked as the msgHeld value keeps increasing, its the
> consumer that stops consuming messages.
>
> >>Also, I was under the impression that it wasn't possible to set message
> TTL
> on the broker and that it could only be set by the producer;
>
> Yes, sorry about that it can be set by producer only, but you can do that
> by
> overriding send method of BrokerPluginSupport class to set message TTL when
> broker receives a message (which we do in cases where producer hasn't
> specified any message TTL value).
>
> Thanks,
> Abhi
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689858.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

xabhi
In reply to this post by artnaseef
Reply | Threaded
Open this post in threaded view
|

Re: Consumer not able to consumer messages from queue

xabhi
In reply to this post by Tim Bain
Hi,
A similar issue happened again in my ActiveMQ broker setup where the broker didn't deliver any messages to a queue consumer for about an hour. I could see some of the msgs being held on the broker and the count kept on increasing during this time though other destinations were working fine and messages were getting enqueued and dequeued. The message TTL was set for the messages being set to 2 hrs on this queue.

I am not able explain this behavior in my setup. When does the broker holds messages and what else should I be looking for? After 1 hour or so all the messages were delivered and consumer again started receiving messages.

Please help!

Thanks,
Abhi