Oracle - delivered Messages remain in ACTIVEMQ_MSGS

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Oracle - delivered Messages remain in ACTIVEMQ_MSGS

Heike Potter
We are running an application on centos with tomcat 8.0.28, java 1.8 and activemq 5.13.4
For persistency we are using Oracle 11.2

We have 3 queues with several producers and consumers.

From time to time we have the effect that messages remain in the ACTIVEMQ_MSGS table in the oracle database though these messages are consumed correctly (we get correct results).
These messages never get removed from the ACTIVEQ_MSG table. Our problem occurs, when we restart activemq,
because these messages are redelivered.

We checked the activemq-Logfile, the enqueue/dequeue counters for the queues are identical.
What is astonishing us is the following logged DELETE command:
---
2017-06-09 10:24:58,281 [ Adaptor Task-2] DEBUG JournalPersistenceAdapter      - Checkpoint started.
2017-06-09 10:24:58,291 [ Adaptor Task-2] DEBUG JDBCPersistenceAdapter         - Cleaning up old messages.
2017-06-09 10:24:58,291 [ Adaptor Task-2] DEBUG DefaultJDBCAdapter             - Executing SQL: DELETE FROM ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <=      ( SELECT min(ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ_ACKS WHERE ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER        AND ACTIVEMQ_ACKS.PRIORITY=?)   )
2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG DefaultJDBCAdapter             - Deleted 0 old message(s) at priority: 0
2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG JDBCPersistenceAdapter         - Cleanup done.
2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG JournalPersistenceAdapter      - Checkpoint done.
---
Though we have entries in ACTIVEMQ_MSGS we do not have entries in the ACTIVEMQ_ACKS table, so if I understand it correctly the statement above can never remove any records.

Here is our configuration file:
---
  <broker brokerName="KWG_BROKER"  useJmx="true" xmlns="http://activemq.apache.org/schema/core">

    <persistenceFactory>
        <journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="${activemq.data}/activemq-data" dataSource="#oracle-ds"/>
    </persistenceFactory> 


    <transportConnectors>
        <transportConnector uri="tcp://<ourhost>:8016?transport.trace=false&transport.soTimeout=60000&keepAlive=true"/>
    </transportConnectors>

  </broker>

    <bean id="oracle-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
     <property name="driverClassName" value="oracle.jdbc.OracleDriver"/>
     <property name="url" value="jdbc:oracle:thin:@<ourhost2>:1521/<ourdbname>"/>
     <property name="username" value="benutzer"/>
     <property name="password" value="passwort"/>
     <property name="poolPreparedStatements" value="true"/>
   </bean>
   
---

And a code snippet:
----
    public void onMessage(final Message message, final Session session) {
        try {
            _log.debug("OmqListener.On Message Beginn");
            ObjectMessage om = (ObjectMessage) message;
            Object obj = om.getObject();
            if (obj instanceof OutMessage) {
                OutMessage ome = (OutMessage) obj;
                String messtype = ome.getTyp().toString();
                String oid = message.getStringProperty(Constants.OID);
                String erstellungsdatum = message.getStringProperty(Constants.EINSTELLUNGSDATUM);
                _log.debug(new StringBuffer("Message erhalten Typus=").append(messtype)
                        .append("; OID=").append(oid)
                        .append("; Erstellungsdatum=").append(erstellungsdatum)
                        .append(";Logging=").append(ome.isLogging()).toString());
                _ausgangsService.performOutmessage(ome, _propholder.isAntwortLoeschenAusDb());
                _log.info("onMessage: Anzahl Versuche = " + ome.getAnzahlVersuche());
               
            }    
            _log.debug("OmqListener.On Message Ende");
        } catch (Throwable e) {
            //auf erneute Zustellversuche(Timer!) warten
            _log.error("Fehler!", e);
        }
    }
---

When we restart activemq, what possibilities do I have to filter out in my programme those messages that have already been delivered ?
I could delete messages from ACTIVEMQ_MSGS before we restart activemq but how can I decide between old messages (already delivered) and those that still need to be delivered ?

Thanks !

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Oracle - delivered Messages remain in ACTIVEMQ_MSGS

gtully
That DELETE statement is for durable topic subscribers. For queues, a
delete is done when the message is acked, does you application acknowledge
receipt of the message after onDelivery or is the session auto ack?

On Thu, 10 Aug 2017 at 23:02 Heike Potter <[hidden email]> wrote:

> We are running an application on centos with tomcat 8.0.28, java 1.8 and
> activemq 5.13.4
> For persistency we are using Oracle 11.2
>
> We have 3 queues with several producers and consumers.
>
> From time to time we have the effect that messages remain in the
> ACTIVEMQ_MSGS table in the oracle database though these messages are
> consumed correctly (we get correct results).
> These messages never get removed from the ACTIVEQ_MSG table. Our problem
> occurs, when we restart activemq,
> because these messages are redelivered.
>
> We checked the activemq-Logfile, the enqueue/dequeue counters for the
> queues
> are identical.
> What is astonishing us is the following logged DELETE command:
> ---
> 2017-06-09 10:24:58,281 [ Adaptor Task-2] DEBUG JournalPersistenceAdapter
> - Checkpoint started.
> 2017-06-09 10:24:58,291 [ Adaptor Task-2] DEBUG JDBCPersistenceAdapter
> - Cleaning up old messages.
> 2017-06-09 10:24:58,291 [ Adaptor Task-2] DEBUG DefaultJDBCAdapter
> - Executing SQL: DELETE FROM ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <=
> ( SELECT min(ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ_ACKS WHERE
> ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER        AND
> ACTIVEMQ_ACKS.PRIORITY=?)   )
> 2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG DefaultJDBCAdapter
> - Deleted 0 old message(s) at priority: 0
> 2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG JDBCPersistenceAdapter
> - Cleanup done.
> 2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG JournalPersistenceAdapter
> - Checkpoint done.
> ---
> Though we have entries in ACTIVEMQ_MSGS we do not have entries in the
> ACTIVEMQ_ACKS table, so if I understand it correctly the statement above
> can
> never remove any records.
>
> Here is our configuration file:
> ---
>   <broker brokerName="KWG_BROKER"  useJmx="true"
> xmlns="http://activemq.apache.org/schema/core">
>
>     <persistenceFactory>
>         <journalPersistenceAdapterFactory journalLogFiles="5"
> dataDirectory="${activemq.data}/activemq-data" dataSource="#oracle-ds"/>
>     </persistenceFactory>
>
>
>     <transportConnectors>
>         <transportConnector
>
> uri=&quot;tcp://&lt;ourhost>:8016?transport.trace=false&amp;transport.soTimeout=60000&amp;keepAlive=true"/>
>     </transportConnectors>
>
>   </broker>
>
>     <bean id="oracle-ds" class="org.apache.commons.dbcp2.BasicDataSource"
> destroy-method="close">
>      <property name="driverClassName" value="oracle.jdbc.OracleDriver"/>
>      <property name=&quot;url&quot;
> value=&quot;jdbc:oracle:thin:@&lt;ourhost2>:1521/<ourdbname>"/>
>      <property name="username" value="benutzer"/>
>      <property name="password" value="passwort"/>
>      <property name="poolPreparedStatements" value="true"/>
>    </bean>
>
> ---
>
> And a code snippet:
> ----
>     public void onMessage(final Message message, final Session session) {
>         try {
>             _log.debug("OmqListener.On Message Beginn");
>             ObjectMessage om = (ObjectMessage) message;
>             Object obj = om.getObject();
>             if (obj instanceof OutMessage) {
>                 OutMessage ome = (OutMessage) obj;
>                 String messtype = ome.getTyp().toString();
>                 String oid = message.getStringProperty(Constants.OID);
>                 String erstellungsdatum =
> message.getStringProperty(Constants.EINSTELLUNGSDATUM);
>                 _log.debug(new StringBuffer("Message erhalten
> Typus=").append(messtype)
>                         .append("; OID=").append(oid)
>                         .append(";
> Erstellungsdatum=").append(erstellungsdatum)
>
> .append(";Logging=").append(ome.isLogging()).toString());
>                 _ausgangsService.performOutmessage(ome,
> _propholder.isAntwortLoeschenAusDb());
>                 _log.info("onMessage: Anzahl Versuche = " +
> ome.getAnzahlVersuche());
>
>             }
>             _log.debug("OmqListener.On Message Ende");
>         } catch (Throwable e) {
>             //auf erneute Zustellversuche(Timer!) warten
>             _log.error("Fehler!", e);
>         }
>     }
> ---
>
> When we restart activemq, what possibilities do I have to filter out in my
> programme those messages that have already been delivered ?
> I could delete messages from ACTIVEMQ_MSGS before we restart activemq but
> how can I decide between old messages (already delivered) and those that
> still need to be delivered ?
>
> Thanks !
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Oracle-delivered-Messages-remain-in-ACTIVEMQ-MSGS-tp4729613.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>
Loading...