one producer, two consumers, receive on same queue, doesnt work

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

one producer, two consumers, receive on same queue, doesnt work

ren0909
i have this, one producer, two consumers, all same queue.
sometimes consumer0 and consumer1 DO able to receive messages consecutively,
but most of the time only consumer0 receive half of the message,
consumer1 always receive null value.
why is this the case, or if there is any setting is wrong.
(on one window machine, two brokers runned on two cmd. third cmd is to hit
mvn spring-boot:run)

SpringApplication.run(ArtemisApplication.class, args);
               
                InitialContext initialContext = null;
               
                Connection connection0 = null;

                Connection connection1 = null;

                try {
initialContext = new InitialContext();
                       
                        Queue queue = (Queue) initialContext.lookup("queue/exampleQueue");

                 // Step 3. Look-up a JMS Connection Factory object from JNDI on
server 0
                ConnectionFactory connectionFactory = (ConnectionFactory)
initialContext.lookup("ConnectionFactory");
                 
                        Connection conn = connectionFactory.createConnection("admin", "admin");
                       
                        Thread.sleep(5000);

                        connection0 = connectionFactory.createConnection("admin", "admin");
                                               
                        connection1 = connectionFactory.createConnection("admin", "admin");

                        connection0.start();

                        connection1.start();

                        Session session0 = connection0.createSession(false,
Session.AUTO_ACKNOWLEDGE);

                        Session session1 = connection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);

                        MessageConsumer consumer0 = session0.createConsumer(queue);

                        MessageConsumer consumer1 = session1.createConsumer(queue);

                        MessageProducer producer0 = session0.createProducer(queue);

                        final int numMessages = 18;

                        for (int i = 0; i < numMessages; i++) {

                                TextMessage message0 = session0.createTextMessage("Queue message: " +
i);

                                producer0.send(message0);
                                System.out.println(message0.getText());

                        }

                        for (int i = 0; i < 9; i++) {

                                TextMessage received0 = (TextMessage) consumer0.receive(5000);

                                if (received0 == null) {
                                        System.out.println("consumer0:null");
                                        //throw new IllegalStateException("Message is null!");
                                }else {
                                        System.out.println("consumer0:" + received0.getText());
                                                       
                                }

                                TextMessage received1 = (TextMessage) consumer1.receive(5000);

                                if (received1 == null) {
                                        System.out.println("consumer1:null");
                                        //throw new IllegalStateException("Message is null!");
                                }else {
                                        System.out.println("consumer1:"+received1.getText());
                                }
                               
                        }




<connectors>
         <connector name="netty-connector">tcp://0.0.0.0:61616</connector>
61617 for another broker

      </connectors>

     
      <acceptors>
         <acceptor name="netty-acceptor">tcp://0.0.0.0:61616</acceptor>
      </acceptors>

     
      <broadcast-groups>
         <broadcast-group name="my-broadcast-group">
            <group-address>231.7.7.7</group-address>
            <group-port>9876</group-port>
            <broadcast-period>100</broadcast-period>
            <connector-ref>netty-connector</connector-ref>
         </broadcast-group>
      </broadcast-groups>

      <discovery-groups>
         <discovery-group name="my-discovery-group">
            <group-address>231.7.7.7</group-address>
            <group-port>9876</group-port>
            <refresh-timeout>1000</refresh-timeout>
         </discovery-group>
      </discovery-groups>

      <cluster-connections>
         <cluster-connection name="my-cluster">
            <connector-ref>netty-connector</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <message-load-balancing>ON_DEMAND</message-load-balancing>
            <max-hops>1</max-hops>
            <discovery-group-ref discovery-group-name="my-discovery-group"/>
         </cluster-connection>
      </cluster-connections>

        <address-settings>
         
         <address-setting match="#">
            <redistribution-delay>0</redistribution-delay>
         </address-setting>
      </address-settings>



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html