ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

jasons
Hello everyone

We are running AMQ 5.14 embedded mode under JBoss 7 with Java 7, 100%
persistent messaging supported by JournalledJDCBPersistence to an Oracle 12
DB.  Durable topic subscription processing is our focus.

Our activmq-broker.xml looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.10.0.xsd">

        <amq:broker id="broker" brokerName="broker" persistent="true"
schedulerSupport="true" useJmx="true">
                <amq:destinationPolicy>
                        <amq:policyMap>
                                <amq:policyEntries>
                                        <amq:policyEntry
topic="EllipseServices"
                                               
allConsumersExclusiveByDefault="true" />
                                </amq:policyEntries>
                        </amq:policyMap>
                </amq:destinationPolicy>
                <amq:destinations>
                        <amq:topic physicalName="EllipseServices" />
                </amq:destinations>
                <amq:managementContext>
                        <amq:managementContext createConnector="false" />
                </amq:managementContext>
                <amq:persistenceAdapter>
                    <amq:journalPersistenceAdapter>
                         <amq:persistenceAdapter>
                              <amq:jdbcPersistenceAdapter
dataSource="#dataSource" transactionIsolation="2" useDatabaseLock="false"
cleanupPeriod="60000" />
                         </amq:persistenceAdapter>
                         <amq:taskRunnerFactory>
                              <bean
class="org.apache.activemq.thread.TaskRunnerFactory"/>
                         </amq:taskRunnerFactory>
                         <amq:journal>
                            <bean
class="org.apache.activeio.journal.active.JournalImpl">
                                  <constructor-arg index="0">
                                      <bean class="java.io.File">
                                           <constructor-arg index="0">
                                             
<value>./.activemq_data</value>
                                           </constructor-arg>
                                      </bean>
                                   </constructor-arg>
                                   <constructor-arg index="1">
                                       <value>10</value>
                                   </constructor-arg>
                                   <constructor-arg index="2">
                                       <value>104857600</value>
                                   </constructor-arg>
                            </bean>
                        </amq:journal>
                    </amq:journalPersistenceAdapter>
                </amq:persistenceAdapter>

               
                <amq:plugins>
                        <amq:redeliveryPlugin>
                                <amq:redeliveryPolicyMap>
                                        <amq:redeliveryPolicyMap>
                                               
<amq:redeliveryPolicyEntries>
                                                       
<amq:redeliveryPolicy topic="EllipseServices"
                                                               
maximumRedeliveries="0" />
                                               
</amq:redeliveryPolicyEntries>
                                                <amq:defaultEntry>
                                                       
<amq:redeliveryPolicy maximumRedeliveries="2"
                                                               
initialRedeliveryDelay="5000"
                                                               
useExponentialBackOff="true" />
                                                </amq:defaultEntry>
                                        </amq:redeliveryPolicyMap>
                                </amq:redeliveryPolicyMap>
                        </amq:redeliveryPlugin>
                        <amq:loggingBrokerPlugin logAll="true" />
                </amq:plugins>
                <amq:transportConnectors>
                        <amq:transportConnector uri="tcp://0.0.0.0:61616" />
                        <amq:transportConnector uri="vm://broker" />
                </amq:transportConnectors>
        </amq:broker>
        <bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
                <property name="brokerURL" value="vm://broker" />
        </bean>
        <bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
                <property name="connectionFactory"
ref="jmsConnectionFactory" />
        </bean>

</beans>
 
We think our scenario is pretty basic and so we're pretty sure we're doing
something wrong or misunderstanding something here.  Can someone help
explain how/why we're seeing what we're seeing.  We seem to be losing
messages which of course is NOT good.

Scenario:
When two durable consumers,DC1 and DC2, consuming from the same topic (T1)
with different selectors, DC1 selector is “table=’TEST1’” and DC2 selector
“table=’TEST2’”, are sent messages, first 100 msgs to DC1 (stopped status)
and then 100 msgs to DC2 (receiving status), we see the
ACTIVEMQ_ACKS.LAST_ACKED_ID of each consumer ultimately rise to 200 and
within an hour all the messages are deleted.   The 100 messages sent to DC1
are recoverable only as long as the broker does not restart.

Interestingly, it actually happens like this:

After DC1 has been sent 100 messages (with selection property
“table=’TEST1’”):

select * from activemq_acks where container = 'topic://EllipseServices' and
sub_name='TestListenerSubscriber':

CONTAINER                SUB_DEST                 CLIENT_ID            
SUB_NAME                SELECTOR             LAST_ACKED_ID   PRIORITY
topic://EllipseServices  topic://EllipseServices  TestListenerClient  
TestListenerSubscriber  table='TEST1'        0               0
topic://EllipseServices  topic://EllipseServices  TestListenerClient2  
TestListenerSubscriber  table='TEST2'        100             0

Out of JMX we see:
- DC1 (stopped) and reporting 100 pending messages as we'd expect.
- DC2 (still listening/receiving) shows 0 pending, 100 enqueued and 100
dequeued and 100 dispatched as we’d expect.

After DC2 has been sent 100 messages (with selection property
“table=’TEST2’”):

select * from activemq_acks where container = 'topic://EllipseServices' and
sub_name='TestListenerSubscriber':

CONTAINER                SUB_DEST                 CLIENT_ID            
SUB_NAME                SELECTOR             LAST_ACKED_ID   PRIORITY
topic://EllipseServices  topic://EllipseServices  TestListenerClient  
TestListenerSubscriber  table='TEST1'        200             0
topic://EllipseServices  topic://EllipseServices  TestListenerClient2  
TestListenerSubscriber  table='TEST2'        200             0
Out of JMX we see:
- DC1 (stopped) shows 100 pending as expected.  
- DC2 (still listening/receiving) shows 0 pending, 100 enqueued, 100
dequeued and 100 dispatched as we’d expect.


Then, after up to 50 mins (5min checkpoint intervals x 10 priorities), the
cleanup task kicks in issuing this statement (apparently to clean up
consumed durable topic messages):

14:25:39,009 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.009 [ActiveMQ Task-1]
DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleaning up old messages.
14:25:39,011 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.011 [ActiveMQ Task-1]
DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Executing SQL: DELETE FROM
ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <=      ( SELECT
min(ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ_ACKS WHERE
ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER        AND
ACTIVEMQ_ACKS.PRIORITY=?)   )
14:25:39,013 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.012 [ActiveMQ Task-1]
DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Deleted 0 old message(s) at
priority: 0
14:25:39,013 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.013 [ActiveMQ Task-1]
DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleanup done.

All records in the DB are now deleted (we agree the statement would delete
the rows based on the data in the DB and the predicates of the query but it
doesn’t seem want one would want).  Interestingly the cleanup message
usually says “Deleted 0 old messages” (but sometimes it reports correctly
the 100 rows deleted).  Regardless, they’re definitely all gone from the DB
after this log entry appears (the one with PRIORITY=0 predicate).
Interestingly, if DC1 starts consuming again it will get all the pending
messages (presumably from the in-memory structures being managed by the
broker).  However, if the broker is restarted any-time before the consumer
receives all its outstanding message then all the messages are lost
permanently…..OOPS!  


Some observations:
1. Consumers on the same topic but with different selectors seem to be able
to increase the LAST_ACKED_ID for consumers with a completely different
selector?  How or why might that be happening?
2. The system behaves as if the selector has no real bearing on the the
updating of the LAST_ACKED_ID row in ACTIVEMQ_ACKS table?
2. If DC1 is allowed to consume all its pending messages, the LAST_ACKED_ID
for DC1 in the ACTIVEMQ_ACKS table is correctly set back to 100

Can someone help point to what we’re doing wrong that’s might be causing
this behaviour?

Thanks in advance.


Jason






--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

Tim Bain
What happens if you run the same message stream with only DC1 subscribed? I
suspect that you'll see the same behavior, and that it stems not from
consumer cross-talk but from our improper handling of messages that don't
match the subscription's selector.

Tim

On Nov 6, 2017 10:42 PM, "jasons" <[hidden email]> wrote:

> Hello everyone
>
> We are running AMQ 5.14 embedded mode under JBoss 7 with Java 7, 100%
> persistent messaging supported by JournalledJDCBPersistence to an Oracle 12
> DB.  Durable topic subscription processing is our focus.
>
> Our activmq-broker.xml looks like this:
>
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
>         xmlns:amq="http://activemq.apache.org/schema/core"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>         xsi:schemaLocation="
>         http://www.springframework.org/schema/beans
>         http://www.springframework.org/schema/beans/spring-beans.xsd
>         http://activemq.apache.org/schema/core
>         http://activemq.apache.org/schema/core/activemq-core-5.10.0.xsd">
>
>         <amq:broker id="broker" brokerName="broker" persistent="true"
> schedulerSupport="true" useJmx="true">
>                 <amq:destinationPolicy>
>                         <amq:policyMap>
>                                 <amq:policyEntries>
>                                         <amq:policyEntry
> topic="EllipseServices"
>
> allConsumersExclusiveByDefault="true" />
>                                 </amq:policyEntries>
>                         </amq:policyMap>
>                 </amq:destinationPolicy>
>                 <amq:destinations>
>                         <amq:topic physicalName="EllipseServices" />
>                 </amq:destinations>
>                 <amq:managementContext>
>                         <amq:managementContext createConnector="false" />
>                 </amq:managementContext>
>                 <amq:persistenceAdapter>
>                     <amq:journalPersistenceAdapter>
>                          <amq:persistenceAdapter>
>                               <amq:jdbcPersistenceAdapter
> dataSource="#dataSource" transactionIsolation="2" useDatabaseLock="false"
> cleanupPeriod="60000" />
>                          </amq:persistenceAdapter>
>                          <amq:taskRunnerFactory>
>                               <bean
> class="org.apache.activemq.thread.TaskRunnerFactory"/>
>                          </amq:taskRunnerFactory>
>                          <amq:journal>
>                             <bean
> class="org.apache.activeio.journal.active.JournalImpl">
>                                   <constructor-arg index="0">
>                                       <bean class="java.io.File">
>                                            <constructor-arg index="0">
>
> <value>./.activemq_data</value>
>                                            </constructor-arg>
>                                       </bean>
>                                    </constructor-arg>
>                                    <constructor-arg index="1">
>                                        <value>10</value>
>                                    </constructor-arg>
>                                    <constructor-arg index="2">
>                                        <value>104857600</value>
>                                    </constructor-arg>
>                             </bean>
>                         </amq:journal>
>                     </amq:journalPersistenceAdapter>
>                 </amq:persistenceAdapter>
>
>
>                 <amq:plugins>
>                         <amq:redeliveryPlugin>
>                                 <amq:redeliveryPolicyMap>
>                                         <amq:redeliveryPolicyMap>
>
> <amq:redeliveryPolicyEntries>
>
> <amq:redeliveryPolicy topic="EllipseServices"
>
> maximumRedeliveries="0" />
>
> </amq:redeliveryPolicyEntries>
>                                                 <amq:defaultEntry>
>
> <amq:redeliveryPolicy maximumRedeliveries="2"
>
> initialRedeliveryDelay="5000"
>
> useExponentialBackOff="true" />
>                                                 </amq:defaultEntry>
>                                         </amq:redeliveryPolicyMap>
>                                 </amq:redeliveryPolicyMap>
>                         </amq:redeliveryPlugin>
>                         <amq:loggingBrokerPlugin logAll="true" />
>                 </amq:plugins>
>                 <amq:transportConnectors>
>                         <amq:transportConnector uri="tcp://0.0.0.0:61616"
> />
>                         <amq:transportConnector uri="vm://broker" />
>                 </amq:transportConnectors>
>         </amq:broker>
>         <bean id="jmsConnectionFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
>                 <property name="brokerURL" value="vm://broker" />
>         </bean>
>         <bean id="jmsTemplate"
> class="org.springframework.jms.core.JmsTemplate">
>                 <property name="connectionFactory"
> ref="jmsConnectionFactory" />
>         </bean>
>
> </beans>
>
> We think our scenario is pretty basic and so we're pretty sure we're doing
> something wrong or misunderstanding something here.  Can someone help
> explain how/why we're seeing what we're seeing.  We seem to be losing
> messages which of course is NOT good.
>
> Scenario:
> When two durable consumers,DC1 and DC2, consuming from the same topic (T1)
> with different selectors, DC1 selector is “table=’TEST1’” and DC2 selector
> “table=’TEST2’”, are sent messages, first 100 msgs to DC1 (stopped status)
> and then 100 msgs to DC2 (receiving status), we see the
> ACTIVEMQ_ACKS.LAST_ACKED_ID of each consumer ultimately rise to 200 and
> within an hour all the messages are deleted.   The 100 messages sent to DC1
> are recoverable only as long as the broker does not restart.
>
> Interestingly, it actually happens like this:
>
> After DC1 has been sent 100 messages (with selection property
> “table=’TEST1’”):
>
> select * from activemq_acks where container = 'topic://EllipseServices' and
> sub_name='TestListenerSubscriber':
>
> CONTAINER                SUB_DEST                 CLIENT_ID
> SUB_NAME                SELECTOR             LAST_ACKED_ID   PRIORITY
> topic://EllipseServices  topic://EllipseServices  TestListenerClient
> TestListenerSubscriber  table='TEST1'        0               0
> topic://EllipseServices  topic://EllipseServices  TestListenerClient2
> TestListenerSubscriber  table='TEST2'        100             0
>
> Out of JMX we see:
> - DC1 (stopped) and reporting 100 pending messages as we'd expect.
> - DC2 (still listening/receiving) shows 0 pending, 100 enqueued and 100
> dequeued and 100 dispatched as we’d expect.
>
> After DC2 has been sent 100 messages (with selection property
> “table=’TEST2’”):
>
> select * from activemq_acks where container = 'topic://EllipseServices' and
> sub_name='TestListenerSubscriber':
>
> CONTAINER                SUB_DEST                 CLIENT_ID
> SUB_NAME                SELECTOR             LAST_ACKED_ID   PRIORITY
> topic://EllipseServices  topic://EllipseServices  TestListenerClient
> TestListenerSubscriber  table='TEST1'        200             0
> topic://EllipseServices  topic://EllipseServices  TestListenerClient2
> TestListenerSubscriber  table='TEST2'        200             0
> Out of JMX we see:
> - DC1 (stopped) shows 100 pending as expected.
> - DC2 (still listening/receiving) shows 0 pending, 100 enqueued, 100
> dequeued and 100 dispatched as we’d expect.
>
>
> Then, after up to 50 mins (5min checkpoint intervals x 10 priorities), the
> cleanup task kicks in issuing this statement (apparently to clean up
> consumed durable topic messages):
>
> 14:25:39,009 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.009 [ActiveMQ
> Task-1]
> DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleaning up old messages.
> 14:25:39,011 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.011 [ActiveMQ
> Task-1]
> DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Executing SQL: DELETE FROM
> ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <=      ( SELECT
> min(ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ_ACKS WHERE
> ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER        AND
> ACTIVEMQ_ACKS.PRIORITY=?)   )
> 14:25:39,013 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.012 [ActiveMQ
> Task-1]
> DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Deleted 0 old message(s) at
> priority: 0
> 14:25:39,013 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.013 [ActiveMQ
> Task-1]
> DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleanup done.
>
> All records in the DB are now deleted (we agree the statement would delete
> the rows based on the data in the DB and the predicates of the query but it
> doesn’t seem want one would want).  Interestingly the cleanup message
> usually says “Deleted 0 old messages” (but sometimes it reports correctly
> the 100 rows deleted).  Regardless, they’re definitely all gone from the DB
> after this log entry appears (the one with PRIORITY=0 predicate).
> Interestingly, if DC1 starts consuming again it will get all the pending
> messages (presumably from the in-memory structures being managed by the
> broker).  However, if the broker is restarted any-time before the consumer
> receives all its outstanding message then all the messages are lost
> permanently…..OOPS!
>
>
> Some observations:
> 1.      Consumers on the same topic but with different selectors seem to
> be able
> to increase the LAST_ACKED_ID for consumers with a completely different
> selector?  How or why might that be happening?
> 2. The system behaves as if the selector has no real bearing on the the
> updating of the LAST_ACKED_ID row in ACTIVEMQ_ACKS table?
> 2.      If DC1 is allowed to consume all its pending messages, the
> LAST_ACKED_ID
> for DC1 in the ACTIVEMQ_ACKS table is correctly set back to 100
>
> Can someone help point to what we’re doing wrong that’s might be causing
> this behaviour?
>
> Thanks in advance.
>
>
> Jason
>
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

jasons
Hi Tim, do you mean improper handling by our consumer code or improper
handling by ActiveMQ when selectors don't match?



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

jasons
I missed the word "our" in your response which I now take to mean ActiveMQ
potential mishandling.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

Tim Bain
Yes, I think it's our bug, but we need to figure out where. I'm hoping that
you can help narrow down the symptoms to make it easier to find and fix
(which increases the odds of it getting fixed, and is therefore in your
best interest).

Tim

On Nov 7, 2017 7:29 PM, "jasons" <[hidden email]> wrote:

> I missed the word "our" in your response which I now take to mean ActiveMQ
> potential mishandling.
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

jasons
Tim, we appreciate any and all help the community can offer and we'll do what
we can to help ourselves.  

You are correct in that a repeat of the test with only DC1 results in the
same behaviour and outcome.  Pending messages for DC1 are lost/deleted after
messages arrive on the same topic but which don't match DC1's selector; the
LAST_ACKED_ID of DC1 ultimately advances to match that of the highest/last
msg id arriving at the topic.  The final message loss occurs only after the
cleanup thread for msg ids at that priority runs.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

Tim Bain
I think I've found the code that results in this behavior.
DurableTopicSubscription.unmatched() is called when a message fails to
match a selector for a given consumer, and it calls
Destination.acknowledge() which eventually calls
DefaultJDBCAdapter.doSetLastAck(). My suspicion is that this method is
being called even if there are pending messages that have not yet been
dispatched to the consumer because it's offline.

Would you be able to attach a debugger to your broker (in a non-operational
environment, of course) and set a breakpoint on those methods and confirm
that that is indeed the code path by which the database is getting updated
in your single-consumer scenario that you tested in your most recent
response?

Thanks,
Tim

On Wed, Nov 8, 2017 at 11:23 PM, jasons <[hidden email]> wrote:

> Tim, we appreciate any and all help the community can offer and we'll do
> what
> we can to help ourselves.
>
> You are correct in that a repeat of the test with only DC1 results in the
> same behaviour and outcome.  Pending messages for DC1 are lost/deleted
> after
> messages arrive on the same topic but which don't match DC1's selector; the
> LAST_ACKED_ID of DC1 ultimately advances to match that of the highest/last
> msg id arriving at the topic.  The final message loss occurs only after the
> cleanup thread for msg ids at that priority runs.
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

jasons
Tim, thanks for getting back....will do as you suggested and report back
soon.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html