ActiveMQ Redelivery not working with new transaction in consumer

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

ActiveMQ Redelivery not working with new transaction in consumer

vedion
Hi,

I have a ActiveMQ where I have setup Redelivery on the client side. With a
simple consumer it works as expected with the below configurations:

```
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

...
    @Bean
    public ConnectionFactory atomikosConnectionFactoryBean() {
        String mqUrl = System.getenv("MQ_URL");
        AtomikosConnectionFactoryBean atomikos = new
AtomikosConnectionFactoryBean();
        atomikos.setLocalTransactionMode(false);
        atomikos.setMaxPoolSize(10);
        atomikos.setUniqueResourceName("QUEUE_BROKER");

        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(4);
        redeliveryPolicy.setBackOffMultiplier(10);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setInitialRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(true);
        redeliveryPolicy.setMaximumRedeliveryDelay(86400000L);
        ActiveMQXAConnectionFactory xaConnectionFactoryBean = new
ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"),
System.getenv("MQ_PASSWORD"), mqUrl);
        xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy);
        xaConnectionFactoryBean.setNonBlockingRedelivery(true);
        atomikos.setXaConnectionFactory(xaConnectionFactoryBean);
        return atomikos;
    }

    @Bean
    public JmsListenerContainerFactory<?>
jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new
DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setErrorHandler(new EHealthEventErrorHandler());
        factory.setMessageConverter(jacksonJmsMessageConverter());
       
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        factory.setDestinationResolver(new EHealthDestinationResolver());
        factory.setSessionTransacted(true);
        return factory;
    }

    @Bean(autowire = Autowire.BY_TYPE)
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new
JmsTemplate(atomikosConnectionFactoryBean());
        jmsTemplate.setDestinationResolver(new
EHealthDestinationResolver());
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
    ...
```

```
import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;

@Transactional
@JmsListener(destination = "XXX")
public void onMessageReceived(XXXEvent event) {
        throw new Exception();
}
```


So the above works as expected and the message is redelivered with the
ExponentialBackOff strategy.

BUT it goes sideways when the message consumer (onMessageReceived) calls a
method on a class that sends a message to another queue in a new
transaction.
Then the message is not redelivered if the exception is thrown after the new
transaction have been committed, ex:

```
import org.springframework.transaction.annotation.Transactional;

public class FooClass {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void createInNewTransaction() {
        sendMessageToAnotherQueue();
    }
}
```

```
import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;

@Transactional
@JmsListener(destination = "Foo")
public void onMessageReceived(FooEvent event) {
        fooClass.createInNewTransaction();
        throw new Exception();
}
```


In the stacktrace below it is seen that the
org.apache.activemq.TransactionContext.synchronizations are nulled when
sending the message in the new transaction. The
TransactionContext.synchronizations contains the ActiveMQMessageConsumer
that is used to receive the message and is needed for the redelivery after
the exception is thrown. When this is cleared the message is not
redelivered:
<http://activemq.2283324.n4.nabble.com/file/t379855/Sync_nulled.png>

```
private void afterRollback() throws JMSException {
        if (synchronizations == null) {
            return;
        }
        ...
}
```


It is the method
com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse()
that detects that the transaction context is different and throws an
exception that is catched in SessionHandleState.notifyBeforeUse():
```
TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction
currentTx)
                        throws InvalidSessionHandleStateException,
UnexpectedTransactionContextException
        {
               
                if ( currentTx == null || !currentTx.isSameTransaction ( ct ) ) {
                        //OOPS! we are being used a different tx context than the one expected...
                       
                        //TODO check: what if subtransaction? Possible solution: ignore if
serial_jta mode, error otherwise.
                       
                        String msg = "The connection/session object is already enlisted in a
(different) transaction.";
                        if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg );
                        throw new UnexpectedTransactionContextException();
                }
               
                //tx context is still the same -> no change in state required
                return null;
        }
```


Then a new context is created and currentContext.checkEnlistBeforeUse(ct) is
called which ends up clearing the TransactionContext.synchronizations

There is a comment in BranchEnlistedStateHandler.checkEnlistBeforeUse():
"//TODO check: what if subtransaction? Possible solution: ignore if
serial_jta mode, error otherwise."

I have a subtransaction and have
"com.atomikos.icatch.serial_jta_transactions" set to true. Am I just unlucky
to have hit something that is not supported yet?


Versions used: "org.springframework:spring-jms:5.1.10.RELEASE",
"com.atomikos:transactions:5.0.3",
"org.apache.activemq:activemq-client:5.15.10"

Have tried to bump to newest versions, but didn't make a difference.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ Redelivery not working with new transaction in consumer

tabish121@gmail.com

Please direct support questions to the ActiveMQ Users mailing list as
this list if for discussion of development of the broker itself.

On 3/28/20 5:17 AM, vedion wrote:

> Hi,
>
> I have a ActiveMQ where I have setup Redelivery on the client side. With a
> simple consumer it works as expected with the below configurations:
>
> ```
> import org.apache.activemq.ActiveMQXAConnectionFactory;
> import org.apache.activemq.RedeliveryPolicy;
> import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean;
> import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
> import org.springframework.jms.config.JmsListenerContainerFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.listener.DefaultMessageListenerContainer;
>
> ...
>      @Bean
>      public ConnectionFactory atomikosConnectionFactoryBean() {
>          String mqUrl = System.getenv("MQ_URL");
>          AtomikosConnectionFactoryBean atomikos = new
> AtomikosConnectionFactoryBean();
>          atomikos.setLocalTransactionMode(false);
>          atomikos.setMaxPoolSize(10);
>          atomikos.setUniqueResourceName("QUEUE_BROKER");
>
>          RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>          redeliveryPolicy.setMaximumRedeliveries(4);
>          redeliveryPolicy.setBackOffMultiplier(10);
>          redeliveryPolicy.setRedeliveryDelay(1000L);
>          redeliveryPolicy.setInitialRedeliveryDelay(1000L);
>          redeliveryPolicy.setUseExponentialBackOff(true);
>          redeliveryPolicy.setMaximumRedeliveryDelay(86400000L);
>          ActiveMQXAConnectionFactory xaConnectionFactoryBean = new
> ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"),
> System.getenv("MQ_PASSWORD"), mqUrl);
>          xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy);
>          xaConnectionFactoryBean.setNonBlockingRedelivery(true);
>          atomikos.setXaConnectionFactory(xaConnectionFactoryBean);
>          return atomikos;
>      }
>
>      @Bean
>      public JmsListenerContainerFactory<?>
> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
> DefaultJmsListenerContainerFactoryConfigurer configurer) {
>          DefaultJmsListenerContainerFactory factory = new
> DefaultJmsListenerContainerFactory();
>          configurer.configure(factory, connectionFactory);
>          factory.setErrorHandler(new EHealthEventErrorHandler());
>          factory.setMessageConverter(jacksonJmsMessageConverter());
>        
> factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
>          factory.setDestinationResolver(new EHealthDestinationResolver());
>          factory.setSessionTransacted(true);
>          return factory;
>      }
>
>      @Bean(autowire = Autowire.BY_TYPE)
>      public JmsTemplate jmsTemplate() {
>          JmsTemplate jmsTemplate = new
> JmsTemplate(atomikosConnectionFactoryBean());
>          jmsTemplate.setDestinationResolver(new
> EHealthDestinationResolver());
>          jmsTemplate.setSessionTransacted(true);
>          return jmsTemplate;
>      }
>      ...
> ```
>
> ```
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.transaction.annotation.Transactional;
>
> @Transactional
> @JmsListener(destination = "XXX")
> public void onMessageReceived(XXXEvent event) {
> throw new Exception();
> }
> ```
>
>
> So the above works as expected and the message is redelivered with the
> ExponentialBackOff strategy.
>
> BUT it goes sideways when the message consumer (onMessageReceived) calls a
> method on a class that sends a message to another queue in a new
> transaction.
> Then the message is not redelivered if the exception is thrown after the new
> transaction have been committed, ex:
>
> ```
> import org.springframework.transaction.annotation.Transactional;
>
> public class FooClass {
>      @Transactional(propagation = Propagation.REQUIRES_NEW)
>      public void createInNewTransaction() {
>          sendMessageToAnotherQueue();
>      }
> }
> ```
>
> ```
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.transaction.annotation.Transactional;
>
> @Transactional
> @JmsListener(destination = "Foo")
> public void onMessageReceived(FooEvent event) {
> fooClass.createInNewTransaction();
> throw new Exception();
> }
> ```
>
>
> In the stacktrace below it is seen that the
> org.apache.activemq.TransactionContext.synchronizations are nulled when
> sending the message in the new transaction. The
> TransactionContext.synchronizations contains the ActiveMQMessageConsumer
> that is used to receive the message and is needed for the redelivery after
> the exception is thrown. When this is cleared the message is not
> redelivered:
> <http://activemq.2283324.n4.nabble.com/file/t379855/Sync_nulled.png>
>
> ```
> private void afterRollback() throws JMSException {
>          if (synchronizations == null) {
>              return;
>          }
> ...
> }
> ```
>
>
> It is the method
> com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse()
> that detects that the transaction context is different and throws an
> exception that is catched in SessionHandleState.notifyBeforeUse():
> ```
> TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction
> currentTx)
> throws InvalidSessionHandleStateException,
> UnexpectedTransactionContextException
> {
>
> if ( currentTx == null || !currentTx.isSameTransaction ( ct ) ) {
> //OOPS! we are being used a different tx context than the one expected...
>
> //TODO check: what if subtransaction? Possible solution: ignore if
> serial_jta mode, error otherwise.
>
> String msg = "The connection/session object is already enlisted in a
> (different) transaction.";
> if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg );
> throw new UnexpectedTransactionContextException();
> }
>
> //tx context is still the same -> no change in state required
> return null;
> }
> ```
>
>
> Then a new context is created and currentContext.checkEnlistBeforeUse(ct) is
> called which ends up clearing the TransactionContext.synchronizations
>
> There is a comment in BranchEnlistedStateHandler.checkEnlistBeforeUse():
> "//TODO check: what if subtransaction? Possible solution: ignore if
> serial_jta mode, error otherwise."
>
> I have a subtransaction and have
> "com.atomikos.icatch.serial_jta_transactions" set to true. Am I just unlucky
> to have hit something that is not supported yet?
>
>
> Versions used: "org.springframework:spring-jms:5.1.10.RELEASE",
> "com.atomikos:transactions:5.0.3",
> "org.apache.activemq:activemq-client:5.15.10"
>
> Have tried to bump to newest versions, but didn't make a difference.
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html


--
Tim Bish