Consumer stops sending messages to DefaultMessageListener ActiveMQ 5.9.1

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Consumer stops sending messages to DefaultMessageListener ActiveMQ 5.9.1

shaneg
This post was updated on .
Hi,

We've introduced a HA deployment of active mq 5.9.1 using leveldb and zookeeper using 3 nodes.  We did a performance run and subsequently our message consumers have stopped consuming messages but message creation is working fine.  

We are seeing the dispatched queue count increasing so it appears that the messages are being delivered to the consumer but are not being delivered to the listener.  This is happening across all the clients of our queues , its not local to a single queue.  

I've observed the following in our log files which may or may not be connected.  The client seems to be continually connecting and reconnecting using the failover transport (fyi activemq1 is the active node) but I'm not sure if this is expected behaviour :

Line 25527: 2014-09-23 15:13:22,882 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://activemq3.cert.api.hco.com:61616, tcp://activemq2.cert.api.hco.com:61616, tcp://activemq1.cert.api.hco.com:61616], from: [tcp://activemq1.cert.api.hco.com:61616, tcp://activemq2.cert.api.hco.com:61616, tcp://activemq3.cert.api.hco.com:61616]
        Line 25528: 2014-09-23 15:13:22,882 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.failover.FailoverTransport - Attempting  0th  connect to: tcp://activemq3.cert.api.hco.com:61616
        Line 25529: 2014-09-23 15:13:22,884 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.failover.FailoverTransport - Connect fail to: tcp://activemq3.cert.api.hco.com:61616, reason: java.net.ConnectException: Connection refused
        Line 25530: 2014-09-23 15:13:22,884 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://activemq3.cert.api.hco.com:61616
        Line 25531: 2014-09-23 15:13:22,884 [ActiveMQ Task-1] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@75d2eea6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
        Line 25532: 2014-09-23 15:13:22,885 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[unconnected]
        Line 25533: 2014-09-23 15:13:22,885 [ActiveMQ Task-1] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@75d2eea6[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
        Line 25534: 2014-09-23 15:13:22,885 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.failover.FailoverTransport - Attempting  0th  connect to: tcp://activemq2.cert.api.hco.com:61616
        Line 25535: 2014-09-23 15:13:22,886 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.failover.FailoverTransport - Connect fail to: tcp://activemq2.cert.api.hco.com:61616, reason: java.net.ConnectException: Connection refused
        Line 25536: 2014-09-23 15:13:22,887 [ActiveMQ Task-1] DEBUG org.apache.activemq

This is the failover connection string:

activemq.connectionfactory.brokerurl=failover:(tcp://activemq1.cert.api.hco.com:61616,tcp://activemq2.cert.api.hco.com:61616,tcp://activemq3.cert.api.hco.com:61616)

This is our current spring config:


    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activemq.connectionfactory.brokerurl}"/>
        <property name="userName" value="${activemq.connectionfactory.brokerUser}"/>
        <property name="password" value="${activemq.connectionfactory.brokerPassword}"/>
       

        <property name="redeliveryPolicy">
              <bean class="org.apache.activemq.RedeliveryPolicy">
                  <property name="initialRedeliveryDelay" value="10000"/>
                   <property name="maximumRedeliveries" value="2" />
               </bean>
        </property>
       
    </bean>



    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="100"/>
    </bean>



    <bean id="inboundContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="inboundQueue"/>
        <property name="messageListener" ref="inboundQueueListener"/>
        <property name="sessionTransacted" value="true"/>
    </bean>
   

I'd be interested if anyone else has seen similar behaviour after introducing HA.  The config above has been in use for about 6 months without any issue so my assumption thus far is that the issue is HA related but of course it may be that its exposing an issue with our config.

We took a thread dump of the client and this is what we are seeing:

"tckContainer-1" prio=10 tid=0x00007f0479209000 nid=0x59ef waiting on condition [0x00007f04a074b000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000d01f3a38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
        at org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
        at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
        at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1397)
        at org.apache.activemq.TransactionContext.rollback(TransactionContext.java:279)
        at org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:583)
        at sun.reflect.GeneratedMethodAccessor528.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:378)
        at com.sun.proxy.$Proxy148.rollback(Unknown Source)
        at org.springframework.jms.support.JmsUtils.rollbackIfNecessary(JmsUtils.java:235)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.rollbackOnExceptionIfNecessary(AbstractMessageListenerContainer.java:610)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:475)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1094)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:991)
        at java.lang.Thread.run(Thread.java:744)

Thanks

Shane