Limited XA transaction size?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Limited XA transaction size?

Viliam Durina
Hi all,

I'm trying to use the XA transactions. I produce 1k items into a queue
using an auto-ack session. Then I try to consume them in an XA transaction,
but only 200 items are received.

The output of the following test is:
java.lang.AssertionError:
Expected :1000
Actual   :200

Why is that? Is that expected?

Viliam

-- Test code:

public class JmsXaTest {
    @ClassRule
    public static EmbeddedActiveMQBroker broker = new
EmbeddedActiveMQBroker();
    private XAConnectionFactory cf = new
ActiveMQXAConnectionFactory(broker.getVmURL());

    @Test
    public void test() throws Exception {
        // produce 1k items to the queue
        try (
            Connection conn = ((ConnectionFactory) cf).createConnection();
            Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
            MessageProducer producer =
session.createProducer(session.createQueue("queue"))
        ) {
            for (int i = 0; i < 1_000; i++) {
                producer.send(session.createTextMessage("msg-" + i));
            }
        }

        // try to consume all items in an XA transaction
        XAConnection conn = cf.createXAConnection();
        conn.start();
        XASession sess = conn.createXASession();
        Xid xid1 = new MyXid(1);
        sess.getXAResource().start(xid1, XAResource.TMNOFLAGS);
        MessageConsumer cons1 =
sess.createConsumer(sess.createQueue("queue"));
        int count = 0;
        for (; cons1.receive(3000) != null; count++) {
            System.out.println(count);
        }
        assertEquals(1_000, count);
    }

    private static class MyXid implements Xid {
        private byte[] gtrid;

        public MyXid(int val) {
            gtrid = new byte[]{(byte) (val)};
        }

        @Override
        public int getFormatId() {
            return 1;
        }

        @Override
        public byte[] getGlobalTransactionId() {
            return gtrid;
        }

        @Override
        public byte[] getBranchQualifier() {
            return new byte[1];
        }
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: Limited XA transaction size?

Viliam Durina
Just found out this is also the case for non-XA transactions or in
CLIENT_ACKNOWLEDGE mode: only up to 200 messages are received. I'm working
on a stream processing engine and we need to consume messages for certain
time and acknowledge them afterwards. I couldn't find a config to increase
this value or set it to unbounded.

So the question is: is there a way to increase the limit of unacknowledged
messages in a session?

Viliam

On Wed, 4 Dec 2019 at 16:12, Viliam Durina <[hidden email]> wrote:

> Hi all,
>
> I'm trying to use the XA transactions. I produce 1k items into a queue
> using an auto-ack session. Then I try to consume them in an XA transaction,
> but only 200 items are received.
>
> The output of the following test is:
> java.lang.AssertionError:
> Expected :1000
> Actual   :200
>
> Why is that? Is that expected?
>
> Viliam
>
> -- Test code:
>
> public class JmsXaTest {
>     @ClassRule
>     public static EmbeddedActiveMQBroker broker = new
> EmbeddedActiveMQBroker();
>     private XAConnectionFactory cf = new
> ActiveMQXAConnectionFactory(broker.getVmURL());
>
>     @Test
>     public void test() throws Exception {
>         // produce 1k items to the queue
>         try (
>             Connection conn = ((ConnectionFactory) cf).createConnection();
>             Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
>             MessageProducer producer =
> session.createProducer(session.createQueue("queue"))
>         ) {
>             for (int i = 0; i < 1_000; i++) {
>                 producer.send(session.createTextMessage("msg-" + i));
>             }
>         }
>
>         // try to consume all items in an XA transaction
>         XAConnection conn = cf.createXAConnection();
>         conn.start();
>         XASession sess = conn.createXASession();
>         Xid xid1 = new MyXid(1);
>         sess.getXAResource().start(xid1, XAResource.TMNOFLAGS);
>         MessageConsumer cons1 =
> sess.createConsumer(sess.createQueue("queue"));
>         int count = 0;
>         for (; cons1.receive(3000) != null; count++) {
>             System.out.println(count);
>         }
>         assertEquals(1_000, count);
>     }
>
>     private static class MyXid implements Xid {
>         private byte[] gtrid;
>
>         public MyXid(int val) {
>             gtrid = new byte[]{(byte) (val)};
>         }
>
>         @Override
>         public int getFormatId() {
>             return 1;
>         }
>
>         @Override
>         public byte[] getGlobalTransactionId() {
>             return gtrid;
>         }
>
>         @Override
>         public byte[] getBranchQualifier() {
>             return new byte[1];
>         }
>     }
> }
>
Reply | Threaded
Open this post in threaded view
|

Re: Limited XA transaction size?

Viliam Durina
I found the setting, it's `maxPageSize`, see here:
https://activemq.apache.org/per-destination-policies
From its description it's not clear to me that it also limits the number of
msgs in a session.

Now another question: is it safe to set it to Integer.MAX_VALUE? What could
be the consequences?

Viliam

On Wed, 18 Dec 2019 at 12:01, Viliam Durina <[hidden email]> wrote:

> Just found out this is also the case for non-XA transactions or in
> CLIENT_ACKNOWLEDGE mode: only up to 200 messages are received. I'm working
> on a stream processing engine and we need to consume messages for certain
> time and acknowledge them afterwards. I couldn't find a config to increase
> this value or set it to unbounded.
>
> So the question is: is there a way to increase the limit of unacknowledged
> messages in a session?
>
> Viliam
>
> On Wed, 4 Dec 2019 at 16:12, Viliam Durina <[hidden email]> wrote:
>
>> Hi all,
>>
>> I'm trying to use the XA transactions. I produce 1k items into a queue
>> using an auto-ack session. Then I try to consume them in an XA transaction,
>> but only 200 items are received.
>>
>> The output of the following test is:
>> java.lang.AssertionError:
>> Expected :1000
>> Actual   :200
>>
>> Why is that? Is that expected?
>>
>> Viliam
>>
>> -- Test code:
>>
>> public class JmsXaTest {
>>     @ClassRule
>>     public static EmbeddedActiveMQBroker broker = new
>> EmbeddedActiveMQBroker();
>>     private XAConnectionFactory cf = new
>> ActiveMQXAConnectionFactory(broker.getVmURL());
>>
>>     @Test
>>     public void test() throws Exception {
>>         // produce 1k items to the queue
>>         try (
>>             Connection conn = ((ConnectionFactory) cf).createConnection();
>>             Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
>>             MessageProducer producer =
>> session.createProducer(session.createQueue("queue"))
>>         ) {
>>             for (int i = 0; i < 1_000; i++) {
>>                 producer.send(session.createTextMessage("msg-" + i));
>>             }
>>         }
>>
>>         // try to consume all items in an XA transaction
>>         XAConnection conn = cf.createXAConnection();
>>         conn.start();
>>         XASession sess = conn.createXASession();
>>         Xid xid1 = new MyXid(1);
>>         sess.getXAResource().start(xid1, XAResource.TMNOFLAGS);
>>         MessageConsumer cons1 =
>> sess.createConsumer(sess.createQueue("queue"));
>>         int count = 0;
>>         for (; cons1.receive(3000) != null; count++) {
>>             System.out.println(count);
>>         }
>>         assertEquals(1_000, count);
>>     }
>>
>>     private static class MyXid implements Xid {
>>         private byte[] gtrid;
>>
>>         public MyXid(int val) {
>>             gtrid = new byte[]{(byte) (val)};
>>         }
>>
>>         @Override
>>         public int getFormatId() {
>>             return 1;
>>         }
>>
>>         @Override
>>         public byte[] getGlobalTransactionId() {
>>             return gtrid;
>>         }
>>
>>         @Override
>>         public byte[] getBranchQualifier() {
>>             return new byte[1];
>>         }
>>     }
>> }
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Limited XA transaction size?

Viliam Durina
I investigated more: the issue only reproduces if the messages are
pre-existing in the queue. Whenever a new message is produced, a new batch
of 200 messages is delivered to the consumer. Now it looks to me like a bug.
Viliam

Code to reproduce: Initially 200 of the pre-existing messages are received.
When one more msg is added, another 200 are received. When one more is
added, all the remaining messages are received.

public class JmsTest3 {

    public static final String BROKER_URL = "tcp://localhost:12354";

    public static void main(String[] args) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(false);
        broker.addConnector(BROKER_URL);
        broker.start();

        ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory(BROKER_URL);

        int initialMessages = 1000;
        int additionalMessages = 10;

        // pre-insert initial messages
        try (
                Connection connection = cf.createConnection();
                Session session = connection.createSession(true, 0);
                MessageProducer producer =
session.createProducer(session.createQueue("queue"));
        ) {
            for (int i = 0; i < initialMessages; i++) {
                producer.send(session.createTextMessage("msg-" + i));
            }
            session.commit();
        }

        // start the consumer in a thread
        Thread consumerThread = new Thread(() -> {
            try (
                    Connection connection = cf.createConnection();
                    Session session = connection.createSession(false,
CLIENT_ACKNOWLEDGE);
                    MessageConsumer consumer =
session.createConsumer(session.createQueue("queue"))
            ) {
                connection.start();
                for (long count = 0; count < initialMessages +
additionalMessages; count++) {
                    consumer.receive();
                    System.out.println("received so far: " + count);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        consumerThread.start();

        // start the producer in another thread
        Thread producerThread = new Thread(() -> {
            try (
                    Connection connection = cf.createConnection();
                    Session session = connection.createSession(false,
DUPS_OK_ACKNOWLEDGE);
                    MessageProducer producer =
session.createProducer(session.createQueue("queue"));
            ) {
                for (int i = 0; i < additionalMessages; i++) {
                    Thread.sleep(1000);
                    producer.send(session.createTextMessage("msg-" + i));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        producerThread.start();

        producerThread.join();
        consumerThread.join();
        broker.stop();
    }
}

On Wed, 18 Dec 2019 at 13:35, Viliam Durina <[hidden email]> wrote:

> I found the setting, it's `maxPageSize`, see here:
> https://activemq.apache.org/per-destination-policies
> From its description it's not clear to me that it also limits the number
> of msgs in a session.
>
> Now another question: is it safe to set it to Integer.MAX_VALUE? What
> could be the consequences?
>
> Viliam
>
> On Wed, 18 Dec 2019 at 12:01, Viliam Durina <[hidden email]> wrote:
>
>> Just found out this is also the case for non-XA transactions or in
>> CLIENT_ACKNOWLEDGE mode: only up to 200 messages are received. I'm working
>> on a stream processing engine and we need to consume messages for certain
>> time and acknowledge them afterwards. I couldn't find a config to increase
>> this value or set it to unbounded.
>>
>> So the question is: is there a way to increase the limit of
>> unacknowledged messages in a session?
>>
>> Viliam
>>
>> On Wed, 4 Dec 2019 at 16:12, Viliam Durina <[hidden email]> wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to use the XA transactions. I produce 1k items into a queue
>>> using an auto-ack session. Then I try to consume them in an XA transaction,
>>> but only 200 items are received.
>>>
>>> The output of the following test is:
>>> java.lang.AssertionError:
>>> Expected :1000
>>> Actual   :200
>>>
>>> Why is that? Is that expected?
>>>
>>> Viliam
>>>
>>> -- Test code:
>>>
>>> public class JmsXaTest {
>>>     @ClassRule
>>>     public static EmbeddedActiveMQBroker broker = new
>>> EmbeddedActiveMQBroker();
>>>     private XAConnectionFactory cf = new
>>> ActiveMQXAConnectionFactory(broker.getVmURL());
>>>
>>>     @Test
>>>     public void test() throws Exception {
>>>         // produce 1k items to the queue
>>>         try (
>>>             Connection conn = ((ConnectionFactory)
>>> cf).createConnection();
>>>             Session session = conn.createSession(false,
>>> AUTO_ACKNOWLEDGE);
>>>             MessageProducer producer =
>>> session.createProducer(session.createQueue("queue"))
>>>         ) {
>>>             for (int i = 0; i < 1_000; i++) {
>>>                 producer.send(session.createTextMessage("msg-" + i));
>>>             }
>>>         }
>>>
>>>         // try to consume all items in an XA transaction
>>>         XAConnection conn = cf.createXAConnection();
>>>         conn.start();
>>>         XASession sess = conn.createXASession();
>>>         Xid xid1 = new MyXid(1);
>>>         sess.getXAResource().start(xid1, XAResource.TMNOFLAGS);
>>>         MessageConsumer cons1 =
>>> sess.createConsumer(sess.createQueue("queue"));
>>>         int count = 0;
>>>         for (; cons1.receive(3000) != null; count++) {
>>>             System.out.println(count);
>>>         }
>>>         assertEquals(1_000, count);
>>>     }
>>>
>>>     private static class MyXid implements Xid {
>>>         private byte[] gtrid;
>>>
>>>         public MyXid(int val) {
>>>             gtrid = new byte[]{(byte) (val)};
>>>         }
>>>
>>>         @Override
>>>         public int getFormatId() {
>>>             return 1;
>>>         }
>>>
>>>         @Override
>>>         public byte[] getGlobalTransactionId() {
>>>             return gtrid;
>>>         }
>>>
>>>         @Override
>>>         public byte[] getBranchQualifier() {
>>>             return new byte[1];
>>>         }
>>>     }
>>> }
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Limited XA transaction size?

Viliam Durina
I created an issue: https://issues.apache.org/jira/browse/AMQ-7369

On Wed, 18 Dec 2019 at 14:22, Viliam Durina <[hidden email]> wrote:

> I investigated more: the issue only reproduces if the messages are
> pre-existing in the queue. Whenever a new message is produced, a new batch
> of 200 messages is delivered to the consumer. Now it looks to me like a bug.
> Viliam
>
> Code to reproduce: Initially 200 of the pre-existing messages are
> received. When one more msg is added, another 200 are received. When one
> more is added, all the remaining messages are received.
>
> public class JmsTest3 {
>
>     public static final String BROKER_URL = "tcp://localhost:12354";
>
>     public static void main(String[] args) throws Exception {
>         BrokerService broker = new BrokerService();
>         broker.setPersistent(false);
>         broker.addConnector(BROKER_URL);
>         broker.start();
>
>         ActiveMQConnectionFactory cf = new
> ActiveMQConnectionFactory(BROKER_URL);
>
>         int initialMessages = 1000;
>         int additionalMessages = 10;
>
>         // pre-insert initial messages
>         try (
>                 Connection connection = cf.createConnection();
>                 Session session = connection.createSession(true, 0);
>                 MessageProducer producer =
> session.createProducer(session.createQueue("queue"));
>         ) {
>             for (int i = 0; i < initialMessages; i++) {
>                 producer.send(session.createTextMessage("msg-" + i));
>             }
>             session.commit();
>         }
>
>         // start the consumer in a thread
>         Thread consumerThread = new Thread(() -> {
>             try (
>                     Connection connection = cf.createConnection();
>                     Session session = connection.createSession(false,
> CLIENT_ACKNOWLEDGE);
>                     MessageConsumer consumer =
> session.createConsumer(session.createQueue("queue"))
>             ) {
>                 connection.start();
>                 for (long count = 0; count < initialMessages +
> additionalMessages; count++) {
>                     consumer.receive();
>                     System.out.println("received so far: " + count);
>                 }
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>         });
>         consumerThread.start();
>
>         // start the producer in another thread
>         Thread producerThread = new Thread(() -> {
>             try (
>                     Connection connection = cf.createConnection();
>                     Session session = connection.createSession(false,
> DUPS_OK_ACKNOWLEDGE);
>                     MessageProducer producer =
> session.createProducer(session.createQueue("queue"));
>             ) {
>                 for (int i = 0; i < additionalMessages; i++) {
>                     Thread.sleep(1000);
>                     producer.send(session.createTextMessage("msg-" + i));
>                 }
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>         });
>         producerThread.start();
>
>         producerThread.join();
>         consumerThread.join();
>         broker.stop();
>     }
> }
>
> On Wed, 18 Dec 2019 at 13:35, Viliam Durina <[hidden email]> wrote:
>
>> I found the setting, it's `maxPageSize`, see here:
>> https://activemq.apache.org/per-destination-policies
>> From its description it's not clear to me that it also limits the number
>> of msgs in a session.
>>
>> Now another question: is it safe to set it to Integer.MAX_VALUE? What
>> could be the consequences?
>>
>> Viliam
>>
>> On Wed, 18 Dec 2019 at 12:01, Viliam Durina <[hidden email]> wrote:
>>
>>> Just found out this is also the case for non-XA transactions or in
>>> CLIENT_ACKNOWLEDGE mode: only up to 200 messages are received. I'm working
>>> on a stream processing engine and we need to consume messages for certain
>>> time and acknowledge them afterwards. I couldn't find a config to increase
>>> this value or set it to unbounded.
>>>
>>> So the question is: is there a way to increase the limit of
>>> unacknowledged messages in a session?
>>>
>>> Viliam
>>>
>>> On Wed, 4 Dec 2019 at 16:12, Viliam Durina <[hidden email]> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to use the XA transactions. I produce 1k items into a queue
>>>> using an auto-ack session. Then I try to consume them in an XA transaction,
>>>> but only 200 items are received.
>>>>
>>>> The output of the following test is:
>>>> java.lang.AssertionError:
>>>> Expected :1000
>>>> Actual   :200
>>>>
>>>> Why is that? Is that expected?
>>>>
>>>> Viliam
>>>>
>>>> -- Test code:
>>>>
>>>> public class JmsXaTest {
>>>>     @ClassRule
>>>>     public static EmbeddedActiveMQBroker broker = new
>>>> EmbeddedActiveMQBroker();
>>>>     private XAConnectionFactory cf = new
>>>> ActiveMQXAConnectionFactory(broker.getVmURL());
>>>>
>>>>     @Test
>>>>     public void test() throws Exception {
>>>>         // produce 1k items to the queue
>>>>         try (
>>>>             Connection conn = ((ConnectionFactory)
>>>> cf).createConnection();
>>>>             Session session = conn.createSession(false,
>>>> AUTO_ACKNOWLEDGE);
>>>>             MessageProducer producer =
>>>> session.createProducer(session.createQueue("queue"))
>>>>         ) {
>>>>             for (int i = 0; i < 1_000; i++) {
>>>>                 producer.send(session.createTextMessage("msg-" + i));
>>>>             }
>>>>         }
>>>>
>>>>         // try to consume all items in an XA transaction
>>>>         XAConnection conn = cf.createXAConnection();
>>>>         conn.start();
>>>>         XASession sess = conn.createXASession();
>>>>         Xid xid1 = new MyXid(1);
>>>>         sess.getXAResource().start(xid1, XAResource.TMNOFLAGS);
>>>>         MessageConsumer cons1 =
>>>> sess.createConsumer(sess.createQueue("queue"));
>>>>         int count = 0;
>>>>         for (; cons1.receive(3000) != null; count++) {
>>>>             System.out.println(count);
>>>>         }
>>>>         assertEquals(1_000, count);
>>>>     }
>>>>
>>>>     private static class MyXid implements Xid {
>>>>         private byte[] gtrid;
>>>>
>>>>         public MyXid(int val) {
>>>>             gtrid = new byte[]{(byte) (val)};
>>>>         }
>>>>
>>>>         @Override
>>>>         public int getFormatId() {
>>>>             return 1;
>>>>         }
>>>>
>>>>         @Override
>>>>         public byte[] getGlobalTransactionId() {
>>>>             return gtrid;
>>>>         }
>>>>
>>>>         @Override
>>>>         public byte[] getBranchQualifier() {
>>>>             return new byte[1];
>>>>         }
>>>>     }
>>>> }
>>>>
>>>