messages are not redelivered in activemq-4.0.2

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

messages are not redelivered in activemq-4.0.2

Jeff Tupholme

Hoping for some help in understanding STOMP semantics.
incubator-activemq-4.0.2
My test application sends two messages to queueA, starts a new
connection which subscribes to queueA, then processes each
message in its own transaction.  When a transaction is aborted I
expected its message to be redelivered but this doesn't happen.
Opening a new connection and resubscribing picks up the aborted
message but it is not marked as being redelivered.
In the Java test code for ActiveMQ it appears that a message within a
rolled-back transaction is redelivered to the session almost
immediately  
(./incubator-activemq-4.0.2/src/test/java/org/apache/activemq/usecases/
Trans actionRollbackOrderTest.java).
I noticed that each time my program sends a message ACK ActiveMQ
throws NullPointerException (see below).
At this point I'm not sure how much functionality STOMP is intended to
offer so I might be wrong to report this as a bug.  Before I report
another bug - Is it supposed to be possible to interleave transactions
on a single connection?

Thanks!


Pseudo-code
msg1 = receiver.NextMsg()
msg1.Begin()   -- begin transaction and acknowledge the message
msg1.Abort()   -- abort transaction
msg2 = receiver.NextMsg()
msg2.Begin()   -- begin transaction and acknowledge the message
msg2.Commit()
msg3 = receiver.NextMsg()  -- expecting msg1 to be redelivered

STOMP Protocol Trace
--------------------
CONNECT
user: usr
passcode: password
.
CONNECTED
session:ID:ubuntu-32784-1155902931062-3:163
.
SUBSCRIBE
destination: /queue/queueA
ack: client
.
MESSAGE
destination:/queue/queueA
type:null
reply-to:null
timestamp:1155915641587
priority:0
expires:0
correlation-id:null
message-id:ID:ubuntu-32784-1155902931062-3:160:-1:1:1

0
.
MESSAGE
destination:/queue/queueA
type:null
reply-to:null
timestamp:1155915641795
priority:0
expires:0
correlation-id:null
message-id:ID:ubuntu-32784-1155902931062-3:160:-1:1:2

1
.
BEGIN
transaction: ba0c02205582f84dfffa90cf75fd2395
.
ACK
message-id: ID:ubuntu-32784-1155902931062-3:160:-1:1:1
transaction: ba0c02205582f84dfffa90cf75fd2395
.
ABORT
transaction: ba0c02205582f84dfffa90cf75fd2395
.
BEGIN
transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
.
ACK
message-id: ID:ubuntu-32784-1155902931062-3:160:-1:1:2
transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
.
COMMIT
transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
.
CONNECT
user: usr
passcode: password
.
CONNECTED
session:ID:ubuntu-32784-1155902931062-3:163
.
SUBSCRIBE
destination: /queue/queueA
ack: client
.
MESSAGE
destination:/queue/queueA
type:null
reply-to:null
timestamp:1155915641587
priority:0
expires:0
correlation-id:null
message-id:ID:ubuntu-32784-1155902931062-3:160:-1:1:1

0
.
MESSAGE
destination:/queue/queueA
type:null
reply-to:null
timestamp:1155915641795
priority:0
expires:0
correlation-id:null
message-id:ID:ubuntu-32784-1155902931062-3:160:-1:1:2

1
.
BEGIN
transaction: ba0c02205582f84dfffa90cf75fd2395
.
ACK
message-id: ID:ubuntu-32784-1155902931062-3:160:-1:1:1
transaction: ba0c02205582f84dfffa90cf75fd2395
.
ABORT
transaction: ba0c02205582f84dfffa90cf75fd2395
.
BEGIN
transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
.
ACK
message-id: ID:ubuntu-32784-1155902931062-3:160:-1:1:2
transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
.
COMMIT
transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
.

Java Exception stack trace
--------------------------
Async error occurred: java.lang.NullPointerException
java.lang.NullPointerException
       at  
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(Prefe
tchS ubscription.java:122)
       at  
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractReg
ion. java:233)
       at  
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.
java :362)
       at  
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBrok
er.j ava:176)
       at  
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:
65)
       at  
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:
65)
       at  
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBroker
Filt er.java:78)
       at  
org.apache.activemq.broker.AbstractConnection.processMessageAck(Abstract
Conn ection.java:382)
       at  
org.apache.activemq.command.MessageAck.visit(MessageAck.java:178)
       at  
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection
.jav a:226)
       at  
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConn
ecti on.java:62)
       at  
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorre
lato r.java:91)
       at  
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.
java :63)
       at  
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMoni
tor. java:122)
       at  
org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(
Stom pTransportFilter.java:69)
       at  
org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(Pro
toco lConverter.java:111)
       at  
org.apache.activemq.transport.stomp.ProtocolConverter.onStompAck(Protoco
lCon verter.java:227)
       at  
org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommad(Prot
ocol Converter.java:133)
       at  
org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(Stomp
Tran sportFilter.java:59)
       at  
org.apache.activemq.transport.TransportSupport.doConsume(TransportSuppor
t.ja va:87)
       at  
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
127)
       at java.lang.Thread.run(Unknown Source)
2006-08-18 15:05:20,756 [127.0.0.1:57422] DEBUG Service



---------------------------------------------------------------------
To unsubscribe from this list please visit:

    http://xircles.codehaus.org/manage_email

Reply | Threaded
Open this post in threaded view
|

Re: messages are not redelivered in activemq-4.0.2

James Strachan-2
If you look at the Ruby stomp client you'll see it implements
redelivery logic on the client side for transactions. This is
basically to avoid ordering issues; if each time you perform a
rollback you force the broker to redeliver them you fairly soon end up
with your queue reordered. So by default we expect the client to
perform redelivery unless the client closes the consumer forcing the
broker to perform the redelivery.

On 8/18/06, Jeff Tupholme <[hidden email]> wrote:

>
> Hoping for some help in understanding STOMP semantics.
> incubator-activemq-4.0.2
> My test application sends two messages to queueA, starts a new
> connection which subscribes to queueA, then processes each
> message in its own transaction.  When a transaction is aborted I
> expected its message to be redelivered but this doesn't happen.
> Opening a new connection and resubscribing picks up the aborted
> message but it is not marked as being redelivered.
> In the Java test code for ActiveMQ it appears that a message within a
> rolled-back transaction is redelivered to the session almost
> immediately
> (./incubator-activemq-4.0.2/src/test/java/org/apache/activemq/usecases/
> Trans actionRollbackOrderTest.java).
> I noticed that each time my program sends a message ACK ActiveMQ
> throws NullPointerException (see below).
> At this point I'm not sure how much functionality STOMP is intended to
> offer so I might be wrong to report this as a bug.  Before I report
> another bug - Is it supposed to be possible to interleave transactions
> on a single connection?
>
> Thanks!
>
>
> Pseudo-code
> msg1 = receiver.NextMsg()
> msg1.Begin()   -- begin transaction and acknowledge the message
> msg1.Abort()   -- abort transaction
> msg2 = receiver.NextMsg()
> msg2.Begin()   -- begin transaction and acknowledge the message
> msg2.Commit()
> msg3 = receiver.NextMsg()  -- expecting msg1 to be redelivered
>
> STOMP Protocol Trace
> --------------------
> CONNECT
> user: usr
> passcode: password
> .
> CONNECTED
> session:ID:ubuntu-32784-1155902931062-3:163
> .
> SUBSCRIBE
> destination: /queue/queueA
> ack: client
> .
> MESSAGE
> destination:/queue/queueA
> type:null
> reply-to:null
> timestamp:1155915641587
> priority:0
> expires:0
> correlation-id:null
> message-id:ID:ubuntu-32784-1155902931062-3:160:-1:1:1
>
> 0
> .
> MESSAGE
> destination:/queue/queueA
> type:null
> reply-to:null
> timestamp:1155915641795
> priority:0
> expires:0
> correlation-id:null
> message-id:ID:ubuntu-32784-1155902931062-3:160:-1:1:2
>
> 1
> .
> BEGIN
> transaction: ba0c02205582f84dfffa90cf75fd2395
> .
> ACK
> message-id: ID:ubuntu-32784-1155902931062-3:160:-1:1:1
> transaction: ba0c02205582f84dfffa90cf75fd2395
> .
> ABORT
> transaction: ba0c02205582f84dfffa90cf75fd2395
> .
> BEGIN
> transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
> .
> ACK
> message-id: ID:ubuntu-32784-1155902931062-3:160:-1:1:2
> transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
> .
> COMMIT
> transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
> .
> CONNECT
> user: usr
> passcode: password
> .
> CONNECTED
> session:ID:ubuntu-32784-1155902931062-3:163
> .
> SUBSCRIBE
> destination: /queue/queueA
> ack: client
> .
> MESSAGE
> destination:/queue/queueA
> type:null
> reply-to:null
> timestamp:1155915641587
> priority:0
> expires:0
> correlation-id:null
> message-id:ID:ubuntu-32784-1155902931062-3:160:-1:1:1
>
> 0
> .
> MESSAGE
> destination:/queue/queueA
> type:null
> reply-to:null
> timestamp:1155915641795
> priority:0
> expires:0
> correlation-id:null
> message-id:ID:ubuntu-32784-1155902931062-3:160:-1:1:2
>
> 1
> .
> BEGIN
> transaction: ba0c02205582f84dfffa90cf75fd2395
> .
> ACK
> message-id: ID:ubuntu-32784-1155902931062-3:160:-1:1:1
> transaction: ba0c02205582f84dfffa90cf75fd2395
> .
> ABORT
> transaction: ba0c02205582f84dfffa90cf75fd2395
> .
> BEGIN
> transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
> .
> ACK
> message-id: ID:ubuntu-32784-1155902931062-3:160:-1:1:2
> transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
> .
> COMMIT
> transaction: 3d4bf8b18ca395e9aaa63a7853f3d3af
> .
>
> Java Exception stack trace
> --------------------------
> Async error occurred: java.lang.NullPointerException
> java.lang.NullPointerException
>        at
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(Prefe
> tchS ubscription.java:122)
>        at
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractReg
> ion. java:233)
>        at
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.
> java :362)
>        at
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBrok
> er.j ava:176)
>        at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:
> 65)
>        at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:
> 65)
>        at
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBroker
> Filt er.java:78)
>        at
> org.apache.activemq.broker.AbstractConnection.processMessageAck(Abstract
> Conn ection.java:382)
>        at
> org.apache.activemq.command.MessageAck.visit(MessageAck.java:178)
>        at
> org.apache.activemq.broker.AbstractConnection.service(AbstractConnection
> .jav a:226)
>        at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConn
> ecti on.java:62)
>        at
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorre
> lato r.java:91)
>        at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.
> java :63)
>        at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMoni
> tor. java:122)
>        at
> org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(
> Stom pTransportFilter.java:69)
>        at
> org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(Pro
> toco lConverter.java:111)
>        at
> org.apache.activemq.transport.stomp.ProtocolConverter.onStompAck(Protoco
> lCon verter.java:227)
>        at
> org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommad(Prot
> ocol Converter.java:133)
>        at
> org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(Stomp
> Tran sportFilter.java:59)
>        at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSuppor
> t.ja va:87)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
> 127)
>        at java.lang.Thread.run(Unknown Source)
> 2006-08-18 15:05:20,756 [127.0.0.1:57422] DEBUG Service
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe from this list please visit:
>
>     http://xircles.codehaus.org/manage_email
>
>


--

James
-------
http://radio.weblogs.com/0112098/

---------------------------------------------------------------------
To unsubscribe from this list please visit:

    http://xircles.codehaus.org/manage_email