[activemq-user] message-driven pojo

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[activemq-user] message-driven pojo

David Ma
Hi,
 
  I wonder if anyone has encountered the following problem. I am using  message-driven pojo with  activemq and  spring. Only the  first message sent by the producer is consumed; all subsequent  messsages are not picked up by the pojo. After I restart the broker,  all the previously un-acknowledged messages are picked up, but the same  problem occurs again (i.e., only the first messsage is consumed, etc.).  
 
  Please let me know how to fix it if you had a similar problem.
 
  Thanks.
 
 

               
---------------------------------
 Yahoo! Music Unlimited - Access over 1 million songs. Try it free.
Reply | Threaded
Open this post in threaded view
|

Re: [activemq-user] message-driven pojo

James Strachan-2
On 28 Nov 2005, at 13:05, David Ma wrote:

> Hi,
>
>   I wonder if anyone has encountered the following problem. I am  
> using  message-driven pojo with  activemq and  spring. Only the  
> first message sent by the producer is consumed; all subsequent  
> messsages are not picked up by the pojo. After I restart the  
> broker,  all the previously un-acknowledged messages are picked up,  
> but the same  problem occurs again (i.e., only the first messsage  
> is consumed, etc.).
>
>   Please let me know how to fix it if you had a similar problem.

I've never experienced this before; is there any chance there's a  
consumer process sucking up messages but never acknowledging them?  
BTW are you using topics or queues?

James
-------
http://radio.weblogs.com/0112098/

Reply | Threaded
Open this post in threaded view
|

Re: [activemq-user] message-driven pojo

David Ma

I am using queue.
 
  If the pojo is inovked, it prints an acknowledgement message to the log file. However, this is not happening.
 
James Strachan <[hidden email]> wrote:  On 28 Nov 2005, at 13:05, David Ma wrote:

> Hi,
>
>   I wonder if anyone has encountered the following problem. I am  
> using  message-driven pojo with  activemq and  spring. Only the  
> first message sent by the producer is consumed; all subsequent  
> messsages are not picked up by the pojo. After I restart the  
> broker,  all the previously un-acknowledged messages are picked up,  
> but the same  problem occurs again (i.e., only the first messsage  
> is consumed, etc.).
>
>   Please let me know how to fix it if you had a similar problem.

I've never experienced this before; is there any chance there's a  
consumer process sucking up messages but never acknowledging them?  
BTW are you using topics or queues?

James
-------
http://radio.weblogs.com/0112098/




               
---------------------------------
 Yahoo! Music Unlimited - Access over 1 million songs. Try it free.
Reply | Threaded
Open this post in threaded view
|

Re: [activemq-user] message-driven pojo

David Ma
In reply to this post by James Strachan-2
This is the message I see if everything goes well,
 
  Received ACTIVEMQ_OBJECT_MESSAGE: id = 0 ActiveMQMessage{ ,  jmsMessageID = null, bodyAsBytes =  org.activemq.io.util.ByteArray@b7ea5c, readOnlyMessage = true,  jmsClientID = 'ID:XP-DMa-4903-1132611353588-223:0' , jmsCorrelationID =  'null' , jmsDestination = vitech.Queue, jmsReplyTo = null,  jmsDeliveryMode = 2, jmsRedelivered = false, jmsType = 'null' ,  jmsExpiration = 0, jmsPriority = 4, jmsTimestamp = 1132612846495,  properties = null, readOnlyProperties = true, entryBrokerName =  'ID:dch-dev-batch-3046-1132611315687-0:0' , entryClusterName =  'default' , consumerNos = [0], transactionId = 'null' , xaTransacted =  false, consumerIdentifer = 'ID:dch-dev-batch-2786-1133190649640-8:0' ,  messageConsumed = false, transientConsumed = false, sequenceNumber = 0,  deliveryCount = 1, dispatchedFromDLQ = false, messageAcknowledge =  org.activemq.ActiveMQSession@7b7bee, jmsMessageIdentity = null,  producerKey = ID:XP-DMa-4903-1132611353588-229: }  ActiveMQObjectMessage{ object = null
 
 
  Do you see anything wrong in it?
 
  Thanks

James Strachan <[hidden email]> wrote:  On 28 Nov 2005, at 13:05, David Ma wrote:

> Hi,
>
>   I wonder if anyone has encountered the following problem. I am  
> using  message-driven pojo with  activemq and  spring. Only the  
> first message sent by the producer is consumed; all subsequent  
> messsages are not picked up by the pojo. After I restart the  
> broker,  all the previously un-acknowledged messages are picked up,  
> but the same  problem occurs again (i.e., only the first messsage  
> is consumed, etc.).
>
>   Please let me know how to fix it if you had a similar problem.

I've never experienced this before; is there any chance there's a  
consumer process sucking up messages but never acknowledging them?  
BTW are you using topics or queues?

James
-------
http://radio.weblogs.com/0112098/




               
---------------------------------
 Yahoo! Personals
 Single? There's someone we'd like you to meet.
 Lot's of someone's, actually. Try Yahoo! Personals
Reply | Threaded
Open this post in threaded view
|

Re: [activemq-user] message-driven pojo

Matt Pryor
In reply to this post by James Strachan-2
Hi there,

Thanks for taking the time to read this.

I have a problem whereby there is always a 500ms (sometimes 1500ms)
delay between sending a message to a Queue and the consumer receiving it.

I am using ActiveMQ 3.2 - please see the attached Java class to
illustrate the problem. Essentially the main method does the following

1) Create a new message - set the "timesent" property to be the current
timestamp
2) Send this message to a Queue
3) Message is consumed asynchronously by onMessage() - a new message is
created with the "timereceived" property set, which is the new current
timestamp. This message is sent back to the queue
4) Main method receives this message
5) Print differences between timestamps

This process is repeated a number of times.

Between parts 2 and 3 there is a consistent 500-1500ms bottleneck. I
cannot find out where this is occurring and wondered if you could shed
and light on the problem. Incidentally, if I place a sleep in between
each loop (after line 154), the wait time is reduced by that amount -
for example Thread.sleep(150) reduces the time between the produce and
consume to 350ms, Thread.sleep(300) reduces it to 200ms, etc.

Thanks very much

Matt Pryor




import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.sql.SQLException;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.springframework.util.Log4jConfigurer;

public class QueueTest implements MessageListener {

        javax.jms.Connection jmsConnection;
        static final String QUEUE_NAME = "myTestQueue";
        MessageConsumer consumer;
        MessageProducer producer;
        Session jmsSession, jmsSession2;
        Queue onDemandTaskQueue;
       
        void createJMSClient() {
                boolean fail=true;
                synchronized (this) {
                        while (fail) {
                                try {
                                        jmsConnection = getMQConnection();
                                        jmsConnection.setExceptionListener(new ExceptionListener() {
                                                public void onException(JMSException arg0) {
                                                        createJMSClient();
                                                }
                                        });
                                        jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // asynch consumer
                                       
                                        jmsSession2 = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // synch consumers
                                       
                                        onDemandTaskQueue = jmsSession.createQueue(QUEUE_NAME);

                                        MessageConsumer qr = jmsSession.createConsumer(onDemandTaskQueue, "initialrequest=true");
                                        producer = jmsSession.createProducer(onDemandTaskQueue);
                                        qr.setMessageListener(this);
                                       
                                        fail=false;
                                        notifyAll();
                                       
                                } catch (JMSException e) {
                                } catch (SQLException e) {
                                }
                                if (fail) {
                                        try {
                                                Thread.sleep(1000);
                                        } catch (InterruptedException ie) {}
                                }
                        }
                }
        }
       
       
        public void onMessage(Message m) {
                long d = System.currentTimeMillis();
                int id = 0;
                try {
                        long time_sent = m.getLongProperty("timesent");
                        id = m.getIntProperty("xid");
                        m = jmsSession.createMessage();
                        m.setLongProperty("timereceived", System.currentTimeMillis());
                        m.setLongProperty("timesent", time_sent);
                        m.setIntProperty("xid", id);
                        producer.send(m, DeliveryMode.NON_PERSISTENT, 1, 0);
                       
                       
                } catch (JMSException jmse) {
                        jmse.printStackTrace();
                }

        }
       
        public static void startEmbeddedBroker () throws JMSException {
                try {
                        Log4jConfigurer.initLogging("./res/activemq/config/log4j.properties");
                } catch (FileNotFoundException nfe) {}
                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
                factory.setUseEmbeddedBroker(true);
                String inetAdd = null;
                try {
                        inetAdd = InetAddress.getLocalHost().getHostAddress();
                } catch (IOException e){
                        throw new JMSException("Unable to determine local IP address: "+e.getMessage());
                }
               
                factory.setBrokerURL("tcp://"+inetAdd+":8088");
                final Connection connection = factory.createConnection();
                connection.start();

                Runtime.getRuntime().addShutdownHook(new Thread() {
                        public void run () {
                                try {
                                        connection.close();
                                } catch (Exception e) {}
                        }
                });
        }
       
        public synchronized static Connection getMQConnection() throws JMSException, SQLException {
                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
                factory.setBrokerURL("tcp://localhost:8088");
                ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
                connection.start();
                return connection;
        }
       
       
        public static void main(String[] args) throws Exception {
               
                /*
                 * Send a plain message with the current time stamp set as the timesent property.
                 * The QueueReceiver creates another message with the receive timestamp.
                 * This message is then sent to the queue again, and consumed in the main method.
                 *
                 */
               
               
                final QueueTest qt = new QueueTest();
                startEmbeddedBroker();
                qt.createJMSClient();
                for (int x=0; x < 50; x++) {
                        long d = System.currentTimeMillis();
                        int xid = x;
                        MessageProducer mp = qt.jmsSession2.createProducer(qt.onDemandTaskQueue);
                        MessageConsumer mc = qt.jmsSession2.createConsumer(qt.onDemandTaskQueue, "xid="+xid+" and timereceived > 0");
                        Message m = qt.jmsSession2.createMessage();
                        m.setLongProperty("timesent",d);
                        m.setIntProperty("xid", xid);
                        m.setBooleanProperty("initialrequest", true);
                        mp.send(m, DeliveryMode.NON_PERSISTENT, 1, 0);
                        m = mc.receive(); m.acknowledge();
                        long tr = m.getLongProperty("timereceived");
                        long tn = System.currentTimeMillis();
                        System.out.println(
                                        "A > B: " + (tr - d) + ", B > A: "+(tn-tr)+", A > B > A: "+(tn-d)
                        );
                }
        }
       
}