[activemq-user] 500ms delay between send and receive

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[activemq-user] 500ms delay between send and receive

Matt Pryor
Hi there,

Thanks for taking the time to read this. This is a duplicate of another
email that had the wrong title, sorry about that.

I have a problem whereby there is always a 500ms (sometimes 1500ms)
delay between sending a message to a Queue and the consumer receiving it.

I am using ActiveMQ 3.2.1 - please see the attached Java class to
illustrate the problem. Essentially the main method does the following

1) Create a new message - set the "timesent" property to be the current
timestamp
2) Send this message to a Queue
3) Message is consumed asynchronously by onMessage() - a new message is
created with the "timereceived" property set, which is the new current
timestamp. This message is sent back to the queue
4) Main method receives this message
5) Print differences between timestamps

This process is repeated a number of times.

Between parts 2 and 3 there is a consistent 500-1500ms bottleneck. I
cannot find out where this is occurring and wondered if you could shed
and light on the problem. Incidentally, if I place a sleep in between
each loop (after line 154), the wait time is reduced by that amount -
for example Thread.sleep(150) reduces the time between the produce and
consume to 350ms, Thread.sleep(300) reduces it to 200ms, etc.

Thanks very much

Matt Pryor






import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.sql.SQLException;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.springframework.util.Log4jConfigurer;

public class QueueTest implements MessageListener {

        javax.jms.Connection jmsConnection;
        static final String QUEUE_NAME = "myTestQueue";
        MessageConsumer consumer;
        MessageProducer producer;
        Session jmsSession, jmsSession2;
        Queue onDemandTaskQueue;
       
        void createJMSClient() {
                boolean fail=true;
                synchronized (this) {
                        while (fail) {
                                try {
                                        jmsConnection = getMQConnection();
                                        jmsConnection.setExceptionListener(new ExceptionListener() {
                                                public void onException(JMSException arg0) {
                                                        createJMSClient();
                                                }
                                        });
                                        jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // asynch consumer
                                       
                                        jmsSession2 = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // synch consumers
                                       
                                        onDemandTaskQueue = jmsSession.createQueue(QUEUE_NAME);

                                        MessageConsumer qr = jmsSession.createConsumer(onDemandTaskQueue, "initialrequest=true");
                                        producer = jmsSession.createProducer(onDemandTaskQueue);
                                        qr.setMessageListener(this);
                                       
                                        fail=false;
                                        notifyAll();
                                       
                                } catch (JMSException e) {
                                } catch (SQLException e) {
                                }
                                if (fail) {
                                        try {
                                                Thread.sleep(1000);
                                        } catch (InterruptedException ie) {}
                                }
                        }
                }
        }
       
       
        public void onMessage(Message m) {
                long d = System.currentTimeMillis();
                int id = 0;
                try {
                        long time_sent = m.getLongProperty("timesent");
                        id = m.getIntProperty("xid");
                        m = jmsSession.createMessage();
                        m.setLongProperty("timereceived", System.currentTimeMillis());
                        m.setLongProperty("timesent", time_sent);
                        m.setIntProperty("xid", id);
                        producer.send(m, DeliveryMode.NON_PERSISTENT, 1, 0);
                       
                       
                } catch (JMSException jmse) {
                        jmse.printStackTrace();
                }

        }
       
        public static void startEmbeddedBroker () throws JMSException {
                try {
                        Log4jConfigurer.initLogging("./res/activemq/config/log4j.properties");
                } catch (FileNotFoundException nfe) {}
                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
                factory.setUseEmbeddedBroker(true);
                String inetAdd = null;
                try {
                        inetAdd = InetAddress.getLocalHost().getHostAddress();
                } catch (IOException e){
                        throw new JMSException("Unable to determine local IP address: "+e.getMessage());
                }
               
                factory.setBrokerURL("tcp://"+inetAdd+":8088");
                final Connection connection = factory.createConnection();
                connection.start();

                Runtime.getRuntime().addShutdownHook(new Thread() {
                        public void run () {
                                try {
                                        connection.close();
                                } catch (Exception e) {}
                        }
                });
        }
       
        public synchronized static Connection getMQConnection() throws JMSException, SQLException {
                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
                factory.setBrokerURL("tcp://localhost:8088");
                ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
                connection.start();
                return connection;
        }
       
       
        public static void main(String[] args) throws Exception {
               
                /*
                 * Send a plain message with the current time stamp set as the timesent property.
                 * The QueueReceiver creates another message with the receive timestamp.
                 * This message is then sent to the queue again, and consumed in the main method.
                 *
                 */
               
               
                final QueueTest qt = new QueueTest();
                startEmbeddedBroker();
                qt.createJMSClient();
                for (int x=0; x < 50; x++) {
                        long d = System.currentTimeMillis();
                        int xid = x;
                        MessageProducer mp = qt.jmsSession2.createProducer(qt.onDemandTaskQueue);
                        MessageConsumer mc = qt.jmsSession2.createConsumer(qt.onDemandTaskQueue, "xid="+xid+" and timereceived > 0");
                        Message m = qt.jmsSession2.createMessage();
                        m.setLongProperty("timesent",d);
                        m.setIntProperty("xid", xid);
                        m.setBooleanProperty("initialrequest", true);
                        mp.send(m, DeliveryMode.NON_PERSISTENT, 1, 0);
                        m = mc.receive(); m.acknowledge();
                        long tr = m.getLongProperty("timereceived");
                        long tn = System.currentTimeMillis();
                        System.out.println(
                                        "A > B: " + (tr - d) + ", B > A: "+(tn-tr)+", A > B > A: "+(tn-d)
                        );
                }
        }
       
}