Consume messages randomly

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Consume messages randomly

pascal
Hi,

I make some tests in transaction mode and randomly the consumer return null
while there are still some messages left.
One time it cosume all messages, one time 200, one time 400.
I use the consumer receive method with a timeout. I try to increase the
timeout but nothing change
If I do a commit after each receive it cosume all messages.

I make a test with receive method without timeout and It work, but I cannot
use it because my program must respond quickly.

This is my source test code:

====================================================
Sender:
====================================================
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;


public class JMSSender {
        protected Queue queue;
        protected String queueName = "AAAAA";
        protected String url = "tcp://localhost:61616";

        protected int ackMode = Session.AUTO_ACKNOWLEDGE;

        public static void main(String[] args) {
                JMSSender msgReceiver = new JMSSender();
                msgReceiver.run();
        }

        public void run() {
                QueueConnection connection = null;
                QueueSession session = null;
                TextMessage myMessage = null;
                QueueSender queueSender = null;
                int nbMsg = 1050;
                int i = 0;

                try {      
                        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
                        connection = (QueueConnection)connectionFactory.createConnection();
                        connection.start();
                        session = connection.createQueueSession(true, 0);
                        queue = session.createQueue(queueName);
                        queueSender = session.createSender(queue);

                        myMessage = session.createTextMessage();

                        for (i = 0; i < nbMsg; i++)
                        {
                                myMessage.setText(i+":0 10             ");
                                queueSender.send(myMessage);
                        }

                        System.out.println("SEND:" + nbMsg);

                } catch (Exception e) {
                } finally {
                        try {
                                session.commit();
                                session.close();
                                connection.close();
                        } catch (JMSException e) {
                                e.printStackTrace();
                        }
                }
        }
}

====================================================
Receiver:
====================================================

package com;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;



public class JMSReceiver {
        protected Queue queue;
        protected String queueName = "AAAAA";
        protected String url = "tcp://localhost:61616";

        protected int ackMode = Session.AUTO_ACKNOWLEDGE;

        public static void main(String[] args) {
                JMSReceiver msgReceiver = new JMSReceiver();
                msgReceiver.run();
        }

        public void run() {
                QueueConnection connection = null;
                QueueSession session = null;
                MessageConsumer consumer = null;

                try {
                        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
                        connection = (QueueConnection)connectionFactory.createConnection();
                        connection.start();
                        consumer = null;
                        session = connection.createQueueSession(true, 0);
                        queue = session.createQueue(queueName);
                        consumer = session.createReceiver(queue);

                        Message message;
                        int compteur = 0;
                        int WaitTime=1000;

                        while(true)
                        {
                                message = consumer.receive();
                                //message = consumer.receiveNoWait();
                                System.out.println("RECEIVE:" + compteur);
                                if (message == null)
                                {
                                        break;
                                }
                                compteur++;
                        }
                        System.out.println("RECEIVE:" + compteur);
                } catch (Exception e) {
                } finally {
                        try {
                                session.commit();
                                consumer.close();
                                session.close();
                                connection.close();
                        } catch (JMSException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }

                }
        }
}

====================================================

PS: the sender and the receiver are not running at the same time.

Thanks for your help




--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html