standard jms code can not receive message from queue

classic Classic list List threaded Threaded
6 messages Options
zbc
Reply | Threaded
Open this post in threaded view
|

standard jms code can not receive message from queue

zbc
This post was updated on .
please look at following code:

            InitialContext ctx = null;
            QueueConnection con = null;
            QueueSession queueSession = null;
            QueueReceiver receiver = null;
            try {
              Hashtable env = new Hashtable();
              env = new Hashtable();
              env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
              env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
              ctx = new InitialContext(env);
              QueueConnectionFactory fac = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
              con = fac.createQueueConnection();
              queueSession = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
              con.start();
              receiver = queueSession.createReceiver((Queue)ctx.lookup("dynamicQueues/FromReqRespClient"));
              Message result = receiver.receiveNoWait();
              System.out.println(result);
            } catch (Exception e) {
            e.printStackTrace();
            } finally {
                close();
            }


this is standard jms code, but it can NOT receive message from queue, if you add a line "Thread.sleep(500)" before "Message result = receiver.receiveNoWait(); ", it WORKS. this code work well to openJMS, is it a bug? you can try yourself, thanks.
Reply | Threaded
Open this post in threaded view
|

Re: standard jms code can not receive message from queue

ceposta
This is not a bug.

From the JMS API doc:
http://docs.oracle.com/javaee/1.4/api/javax/jms/MessageConsumer.html#receiveNoWait()

"Receives the next message if one is immediately available."
Could be the broker hasn't dispatched the message yet, or it's in flight,
etc, etc. so it's not "immediately available"

If you code for receiveNoWait() you have to understand its ramifications
and plan for it accordingly.




On Thu, Sep 26, 2013 at 1:08 PM, zbc <[hidden email]> wrote:

> please look at following code:
>
>             InitialContext ctx = null;
>             QueueConnection con = null;
>             QueueSession queueSession = null;
>             QueueReceiver receiver = null;
>             try {
>               Hashtable env = new Hashtable();
>               env = new Hashtable();
>               env.put(Context.INITIAL_CONTEXT_FACTORY,
> "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>               env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
>               ctx = new InitialContext(env);
>               QueueConnectionFactory fac =
> (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
>               con = fac.createQueueConnection();
>               queueSession = con.createQueueSession(false,
> Session.AUTO_ACKNOWLEDGE);
>               con.start();
>               receiver =
>
> queueSession.createReceiver((Queue)ctx.lookup("dynamicQueues/FromReqRespClient"));
>               Message result = receiver.receiveNoWait();
>               System.out.println(result);
>             } catch (Exception e) {
>                 e.printStackTrace();
>             } finally {
>                 close();
>             }
>
>
> this is standard jms code, but it can NOT receive message from queue, if
> you
> add a line "Thread.sleep(500)" before "Message result =
> receiver.receiveNoWait(); ", it WORKS. this code work well to openJMS, is
> it
> a bug? you can try yourself, thanks.
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/standard-jms-code-can-not-receive-message-from-queue-tp4671868.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



--
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta
zbc
Reply | Threaded
Open this post in threaded view
|

Re: standard jms code can not receive message from queue

zbc
This post was updated on .
Thanks, but i put a message in the queue, i can see it in the console, then i run the code repeatedly, but it still there, in fact, i have been retrieving it for 1 week, i don't believe the broke need a couple of week to dispatch it!!
Reply | Threaded
Open this post in threaded view
|

Re: standard jms code can not receive message from queue

ceposta
Put together a unit test that shows this and we will take a look.

On Thursday, September 26, 2013, zbc wrote:

> Thanks, but i put a message in the queue, i can see it in the console,
> then i
> run the code repeatedly, but it still there, in fact, i have been
> retrieving
> if for 1 week, i don't believe the broke need a couple of week to dispatch
> it!!
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/standard-jms-code-can-not-receive-message-from-queue-tp4671868p4671873.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>


--
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta
zbc
Reply | Threaded
Open this post in threaded view
|

Re: standard jms code can not receive message from queue

zbc
This post was updated on .
In reply to this post by zbc
please run following test:



import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import junit.framework.Assert;

import org.junit.Before;
import org.junit.Test;

public class test {
        /**
         * put a test message to queue before test case run.
         */
        @Before
        public void setup() {
                InitialContext ctx = null;
                QueueConnection con = null;
                QueueSession queueSession = null;
                QueueSender sender = null;
                try {
                        Hashtable env = new Hashtable();
                        env = new Hashtable();
                        env.put(Context.INITIAL_CONTEXT_FACTORY,
                                        "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
                        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
                        ctx = new InitialContext(env);
                        QueueConnectionFactory fac = (QueueConnectionFactory) ctx
                                        .lookup("ConnectionFactory");
                        con = fac.createQueueConnection();
                        queueSession = con.createQueueSession(false,
                                        Session.AUTO_ACKNOWLEDGE);
                        con.start();
                        sender = queueSession.createSender((Queue) ctx
                                        .lookup("dynamicQueues/queue1"));
                        sender.send(queueSession.createTextMessage("test message"));
                } catch (Exception e) {
                        e.printStackTrace();
                } finally {
                        try {
                                sender.close();
                                queueSession.close();
                                con.close();
                                ctx.close();
                        } catch (JMSException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (NamingException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                }
        }

        /**
         * the receiver must sleep enough time after it is created in order to
         * receive the message in the queue.
         */
        @Test
        public void test1() {
                InitialContext ctx = null;
                QueueConnection con = null;
                QueueSession queueSession = null;
                QueueReceiver receiver = null;
                try {
                        Hashtable env = new Hashtable();
                        env = new Hashtable();
                        env.put(Context.INITIAL_CONTEXT_FACTORY,
                                        "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
                        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
                        ctx = new InitialContext(env);
                        QueueConnectionFactory fac = (QueueConnectionFactory) ctx
                                        .lookup("ConnectionFactory");
                        con = fac.createQueueConnection();
                        queueSession = con.createQueueSession(false,
                                        Session.AUTO_ACKNOWLEDGE);
                        con.start();
                        receiver = queueSession.createReceiver((Queue) ctx
                                        .lookup("dynamicQueues/queue1"));

                        // in order to receive the message from queue, must sleep enough
                        // time
                        Thread.sleep(500);
                        // the message can be received successfully after sleep.
                        Message result = receiver.receiveNoWait();
                        Assert.assertNotNull(result);
                } catch (Exception e) {
                        e.printStackTrace();
                } finally {
                        try {
                                receiver.close();
                                queueSession.close();
                                con.close();
                                ctx.close();
                        } catch (JMSException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (NamingException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                }
        }

        /**
         * the receiver never get the message IMMEDIATELY after it is created.
         *
         * @throws Exception
         *             to junit.
         */
        @Test
        public void test2() throws Exception {
                // sleep enough time to make sure the test message is really put into
                // the queue.
                Thread.sleep(5000);

                InitialContext ctx = null;
                QueueConnection con = null;
                QueueSession queueSession = null;
                QueueReceiver receiver = null;
                try {
                        Hashtable env = new Hashtable();
                        env = new Hashtable();
                        env.put(Context.INITIAL_CONTEXT_FACTORY,
                                        "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
                        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");

                        // repeatedly try to receive message, but you NEVER get it!!
                        for (int i = 0; i < 100; i++) {
                                ctx = new InitialContext(env);
                                QueueConnectionFactory fac = (QueueConnectionFactory) ctx
                                                .lookup("ConnectionFactory");
                                con = fac.createQueueConnection();
                                queueSession = con.createQueueSession(false,
                                                Session.AUTO_ACKNOWLEDGE);
                                con.start();
                                receiver = queueSession.createReceiver((Queue) ctx
                                                .lookup("dynamicQueues/queue1"));
                                // no sleep after the receiver is created
                                // Thread.sleep(500);
                                Message result = receiver.receiveNoWait();
                                // you CAN NOT get the message in the queue.
                                Assert.assertNull(result);
                        }
                } catch (Exception e) {
                        e.printStackTrace();
                } finally {
                        try {
                                receiver.close();
                                queueSession.close();
                                con.close();
                                ctx.close();
                        } catch (JMSException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (NamingException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                }
        }

}

Reply | Threaded
Open this post in threaded view
|

Re: standard jms code can not receive message from queue

ceposta
You must not rely on receiveNoWait to always return a message. What the
method does is check whether any messages have been dispatched to the
consumer AND are *waiting on the consumer's prefetch* This is not the case
in your test above as it takes a few cycles for the broker to dispatch over
your network to get to the consumer's prefetch.

Please do a search of the list ahead of time as this has been succinctly
answered before by Tim:

http://activemq.2283324.n4.nabble.com/consumer-receiveNoWait-question-td4656435.html#a4656615

This behavior follows the JMS spec, but I realize different providers may
have implemented this differently, so if you try with prefetch == 0 on the
consumer side, it should actively poll the broker ... which is probably
what you're looking for.



On Fri, Sep 27, 2013 at 7:38 AM, zbc <[hidden email]> wrote:

> please run following test:
>
> *package com.conceptwave.servicedesigner;
>
> import java.util.Hashtable;
>
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.Queue;
> import javax.jms.QueueConnection;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.QueueReceiver;
> import javax.jms.QueueSender;
> import javax.jms.QueueSession;
> import javax.jms.Session;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> import junit.framework.Assert;
>
> import org.junit.Before;
> import org.junit.Test;
>
> public class test {
>         /**
>          * put a test message to queue before test case run.
>          */
>         @Before
>         public void setup() {
>                 InitialContext ctx = null;
>                 QueueConnection con = null;
>                 QueueSession queueSession = null;
>                 QueueSender sender = null;
>                 try {
>                         Hashtable env = new Hashtable();
>                         env = new Hashtable();
>                         env.put(Context.INITIAL_CONTEXT_FACTORY,
>
> "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>                         env.put(Context.PROVIDER_URL,
> "tcp://localhost:61616");
>                         ctx = new InitialContext(env);
>                         QueueConnectionFactory fac =
> (QueueConnectionFactory) ctx
>                                         .lookup("ConnectionFactory");
>                         con = fac.createQueueConnection();
>                         queueSession = con.createQueueSession(false,
>                                         Session.AUTO_ACKNOWLEDGE);
>                         con.start();
>                         sender = queueSession.createSender((Queue) ctx
>                                         .lookup("dynamicQueues/queue1"));
>                         sender.send(queueSession.createTextMessage("test
> message"));
>                 } catch (Exception e) {
>                         e.printStackTrace();
>                 } finally {
>                         try {
>                                 sender.close();
>                                 queueSession.close();
>                                 con.close();
>                                 ctx.close();
>                         } catch (JMSException e) {
>                                 // TODO Auto-generated catch block
>                                 e.printStackTrace();
>                         } catch (NamingException e) {
>                                 // TODO Auto-generated catch block
>                                 e.printStackTrace();
>                         }
>                 }
>         }
>
>         /**
>          * the receiver must sleep enough time after it is created in
> order to
>          * receive the message in the queue.
>          */
>         @Test
>         public void test1() {
>                 InitialContext ctx = null;
>                 QueueConnection con = null;
>                 QueueSession queueSession = null;
>                 QueueReceiver receiver = null;
>                 try {
>                         Hashtable env = new Hashtable();
>                         env = new Hashtable();
>                         env.put(Context.INITIAL_CONTEXT_FACTORY,
>
> "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>                         env.put(Context.PROVIDER_URL,
> "tcp://localhost:61616");
>                         ctx = new InitialContext(env);
>                         QueueConnectionFactory fac =
> (QueueConnectionFactory) ctx
>                                         .lookup("ConnectionFactory");
>                         con = fac.createQueueConnection();
>                         queueSession = con.createQueueSession(false,
>                                         Session.AUTO_ACKNOWLEDGE);
>                         con.start();
>                         receiver = queueSession.createReceiver((Queue) ctx
>                                         .lookup("dynamicQueues/queue1"));
>
>                         // in order to receive the message from queue,
> must sleep enough
>                         // time
>                         Thread.sleep(500);
>                         // the message can be received successfully after
> sleep.
>                         Message result = receiver.receiveNoWait();
>                         Assert.assertNotNull(result);
>                 } catch (Exception e) {
>                         e.printStackTrace();
>                 } finally {
>                         try {
>                                 receiver.close();
>                                 queueSession.close();
>                                 con.close();
>                                 ctx.close();
>                         } catch (JMSException e) {
>                                 // TODO Auto-generated catch block
>                                 e.printStackTrace();
>                         } catch (NamingException e) {
>                                 // TODO Auto-generated catch block
>                                 e.printStackTrace();
>                         }
>                 }
>         }
>
>         /**
>          * the receiver never get the message IMMEDIATELY after it is
> created.
>          *
>          * @throws Exception
>          *             to junit.
>          */
>         @Test
>         public void test2() throws Exception {
>                 // sleep enough time to make sure the test message is
> really put into
>                 // the queue.
>                 Thread.sleep(5000);
>
>                 InitialContext ctx = null;
>                 QueueConnection con = null;
>                 QueueSession queueSession = null;
>                 QueueReceiver receiver = null;
>                 try {
>                         Hashtable env = new Hashtable();
>                         env = new Hashtable();
>                         env.put(Context.INITIAL_CONTEXT_FACTORY,
>
> "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>                         env.put(Context.PROVIDER_URL,
> "tcp://localhost:61616");
>
>                         // repeatedly try to receive message, but you
> NEVER get it!!
>                         for (int i = 0; i < 100; i++) {
>                                 ctx = new InitialContext(env);
>                                 QueueConnectionFactory fac =
> (QueueConnectionFactory) ctx
>
> .lookup("ConnectionFactory");
>                                 con = fac.createQueueConnection();
>                                 queueSession =
> con.createQueueSession(false,
>                                                 Session.AUTO_ACKNOWLEDGE);
>                                 con.start();
>                                 receiver =
> queueSession.createReceiver((Queue) ctx
>
> .lookup("dynamicQueues/queue1"));
>                                 // no sleep after the receiver is created
>                                 // Thread.sleep(500);
>                                 Message result = receiver.receiveNoWait();
>                                 // you CAN NOT get the message in the
> queue.
>                                 Assert.assertNull(result);
>                         }
>                 } catch (Exception e) {
>                         e.printStackTrace();
>                 } finally {
>                         try {
>                                 receiver.close();
>                                 queueSession.close();
>                                 con.close();
>                                 ctx.close();
>                         } catch (JMSException e) {
>                                 // TODO Auto-generated catch block
>                                 e.printStackTrace();
>                         } catch (NamingException e) {
>                                 // TODO Auto-generated catch block
>                                 e.printStackTrace();
>                         }
>                 }
>         }
>
> }*
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/standard-jms-code-can-not-receive-message-from-queue-tp4671868p4671901.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



--
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta