multi-consumer listening the same queue only one consumer can recieve message

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

multi-consumer listening the same queue only one consumer can recieve message

ladygaga
This post has NOT been accepted by the mailing list yet.

I was currently using NMS to develop application based ActiveMQ(5.6).

We have several consumers(exe) trying to recieving massgaes from the same queue(not topic). While all the messages just all go to one consumer though I have make the consumer to sleep for seconds after recieving a message. By the way, we don't want the consumers recieving the same messages other consumers have recieved.

It is mentioned in the official website that we should set Prefetch Limit to decide how many messages can be streamed to a consumer at any point in time. And it can both be configured and coded.

One way I tried is to code using PrefetchPolicy class binding the ConnectionFactory class like bellow.

PrefetchPolicy poli = new PrefetchPolicy();
poli.QueuePrefetch = 0;
ConnectionFactory fac = new ConnectionFactory("activemq:tcp://Localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
fac.PrefetchPolicy = poli;
using (IConnection con = fac.CreateConnection())
{
   using (ISession se = con.CreateSession())
   {
       IDestination destination = SessionUtil.GetDestination(se, queue, DestinationType.Queue);
       using (IMessageConsumer consumer = se.CreateConsumer(queue1))
       {
          con.Start();
          while (true)
          {
              ITextMessage message = consumer.Receive() as ITextMessage;
              Thread.Sleep(2000);
              if (message != null)
              {
                Task.Factory.StartNew(() => extractAndSend(message.Text));   //do something
              }
              else
              {
                Console.WriteLine("No message received~");
              }
            }
        }
   }
}

But no matter what prefetch value I set the behavior of the consumers stay the same as before.

And I've tried the second way tying to get the result, namely configure the server conf file. I change the activemq.xml of the server like bellow.
<destinationPolicy>
    <policyMap>
      <policyEntries>
            <policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb" />
            <policyEntry topic=">" producerFlowControl="true" memoryLimit="5mb">
                <dispatchPolicy >
                        <roundRobinDispatchPolicy />
                </dispatchPolicy>
                <subscriptionRecoveryPolicy>
                    <noSubscriptionRecoveryPolicy />
                </subscriptionRecoveryPolicy>
            </policyEntry>
      </policyEntries>
    </policyMap>
</destinationPolicy>

But though I've set the dispatchpolicy the messages still go to one consumer.

I want to know that:
    Whether this behavior can be achieved by just configuring the server xml file to enable all the consumers recieve messages from one queue?  If so, how to configure this and what is wrong with my configuration? If not, how can I use codes to achieve the goal?
Thanks.