BrokerService.acknowledge() behavior

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

BrokerService.acknowledge() behavior

Matt Pavlovich-2
I have a class that is loaded as a plugin that implements BrokerService.acknowledge(), but I’m getting “Standard” acknowledgements for almost all consumption scenarios, even rollback. If I rollback a message, the MessageAck type in the MessageAck() is still “Standard”, and the message isn’t being consumed (as expected). The Javadocs indicate that “Standard” is to tell the broker to remove the message.

This doesn’t seem to be consistent.. am I missing something?

Is this the correct place to plugin to the broker to detect when a message is acknowledged vs rolled back, etc?


BrokerService.acknowledge() impl:
  @Override
  public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
    ….
         if(ack.isDeliveredAck())
                  log "Delivered";
          else if(ack.isExpiredAck())
                  log "Expired";
          else if(ack.isIndividualAck())
                  log "Individual";
          else if(ack.isPoisonAck())
                  log "Poison";
          else if(ack.isRedeliveredAck())
                  log "Redelivered";
          else if(ack.isUnmatchedAck())
                  log "Unmatched";
          else if(ack.isStandardAck())
                  log "Standard";
          else
                  log "Unknown AckType: " + ack.getAckType();
    …

Client:
                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                MessageConsumer consumer = session.createConsumer(session.createQueue(“QUEUE.IN"));
                TextMessage message = (TextMessage)consumer.receive(2000);
               
                while(message != null) {
                        logger.info("--- [testTransactedReceiveRollback] Receiving test message : " + message.getText() + " JMS Message ID: " + message.getJMSMessageID());
                        message = (TextMessage)consumer.receive(2000);
                }
                session.rollback();
                closeResources(connection, session, null, consumer);
          ….

Thanks,
Matt Pavlovich | Founding Partner | Media Driver
512.284.4330 | [hidden email]
810 Hesters Crossing, Suite 165, Round Rock, TX 78681

Reply | Threaded
Open this post in threaded view
|

Re: BrokerService.acknowledge() behavior

gtully
the ack comes back but it is pending transaction completion. The ack is
discarded if the transaction rolls back. The operations associated with the
ack are associated with the broker side transaction via synchronizations in
the transaction broker and message stores.

On 30 September 2014 21:24, Matt Pavlovich <[hidden email]> wrote:

> I have a class that is loaded as a plugin that implements
> BrokerService.acknowledge(), but I’m getting “Standard” acknowledgements
> for almost all consumption scenarios, even rollback. If I rollback a
> message, the MessageAck type in the MessageAck() is still “Standard”, and
> the message isn’t being consumed (as expected). The Javadocs indicate that
> “Standard” is to tell the broker to remove the message.
>
> This doesn’t seem to be consistent.. am I missing something?
>
> Is this the correct place to plugin to the broker to detect when a message
> is acknowledged vs rolled back, etc?
>
>
> BrokerService.acknowledge() impl:
>   @Override
>   public void acknowledge(ConsumerBrokerExchange consumerExchange,
> MessageAck ack) throws Exception {
>     ….
>          if(ack.isDeliveredAck())
>                   log "Delivered";
>           else if(ack.isExpiredAck())
>                   log "Expired";
>           else if(ack.isIndividualAck())
>                   log "Individual";
>           else if(ack.isPoisonAck())
>                   log "Poison";
>           else if(ack.isRedeliveredAck())
>                   log "Redelivered";
>           else if(ack.isUnmatchedAck())
>                   log "Unmatched";
>           else if(ack.isStandardAck())
>                   log "Standard";
>           else
>                   log "Unknown AckType: " + ack.getAckType();
>     …
>
> Client:
>                 Session session = connection.createSession(true,
> Session.SESSION_TRANSACTED);
>                 MessageConsumer consumer =
> session.createConsumer(session.createQueue(“QUEUE.IN"));
>                 TextMessage message = (TextMessage)consumer.receive(2000);
>
>                 while(message != null) {
>                         logger.info("--- [testTransactedReceiveRollback]
> Receiving test message : " + message.getText() + " JMS Message ID: " +
> message.getJMSMessageID());
>                         message = (TextMessage)consumer.receive(2000);
>                 }
>                 session.rollback();
>                 closeResources(connection, session, null, consumer);
>           ….
>
> Thanks,
> Matt Pavlovich | Founding Partner | Media Driver
> 512.284.4330 | [hidden email]
> 810 Hesters Crossing, Suite 165, Round Rock, TX 78681
>
>


--
http://redhat.com
http://blog.garytully.com