Mqtt pending running 2 durable subscriber for same topic but messages are not delievered to one of the subscriber after broker restart

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Mqtt pending running 2 durable subscriber for same topic but messages are not delievered to one of the subscriber after broker restart

sonikrish
package activeMQ;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Subscriber implements MqttCallback {
       
         public void messageArrived(String topic, MqttMessage message) throws
Exception
          {
         System.out.println("\nReceived Message is : " + new
String(message.getPayload()));    
          }
         
         public void connectionLost(Throwable arg0) {
                       
                        System.out.println("\nConnection lost");
                }
               
                public void deliveryComplete(IMqttDeliveryToken arg0) {
                       
                        System.out.println("\nDelivery complete");
                }

         public static void main(String arg[]) {
                       
                        Subscriber s= new Subscriber();
                        s.subs();
                }
               
         public void subs(){
                 String topic        = "MQTT_publisher";          
                int qos             = 2;
                String broker       = <url>;
                String clientId     = <ClientID>;
                String username  = <username>;
                String password  = <password>;
               // IMqttMessageListener messageListner = null;
                MemoryPersistence persistence = new MemoryPersistence();
               
                 try {
                    MqttClient sampleClient = new MqttClient(broker, clientId,
persistence);
                    MqttConnectOptions connOpts = new MqttConnectOptions();
                    connOpts.setCleanSession(false);
                    connOpts.setUserName(username);
                    connOpts.setPassword(password.toCharArray());
                    System.out.println("Connecting to broker: "+broker);
                    sampleClient.connect(connOpts);  
                    if(sampleClient.isConnected()==true)
                    {
                    System.out.println("Connected");
                    sampleClient.subscribe(topic, qos);
                   
                    System.out.println("Message subscribed");
                   
                    sampleClient.setCallback(this);
                    sampleClient.connect();
                  }
                   
                    sampleClient.disconnect();
                    System.out.println("Disconnected");
                }
                catch(MqttException me)
                {
                    System.out.println("reason "+me.getReasonCode());
                }
         }
}

Both subscriber is having the same properties and ClientID is different for
both subscriber. Also, only one of them is getting message after broker
restart everytime and another one is not getting any messages despite of
being durable subscriber.

When i am logging into activeMQ console and watching under subscriber for
one which is not getting msg, its Dispatched Queue Size is increasing along
with Enqueue Counter but Dequeue Counter  is not increasing.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html
Reply | Threaded
Open this post in threaded view
|

Re: Mqtt pending running 2 durable subscriber for same topic but messages are not delievered to one of the subscriber after broker restart

artnaseef
In the console, can you see the connection for the inactive subscriber?

On Wed, Jan 3, 2018 at 12:30 AM, sonikrish <[hidden email]> wrote:

> package activeMQ;
>
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
>
> public class Subscriber implements MqttCallback {
>
>          public void messageArrived(String topic, MqttMessage message)
> throws
> Exception
>                 {
>          System.out.println("\nReceived Message is : " + new
> String(message.getPayload()));
>                 }
>
>          public void connectionLost(Throwable arg0) {
>
>                         System.out.println("\nConnection lost");
>                 }
>
>                 public void deliveryComplete(IMqttDeliveryToken arg0) {
>
>                         System.out.println("\nDelivery complete");
>                 }
>
>          public static void main(String arg[]) {
>
>                         Subscriber s= new Subscriber();
>                         s.subs();
>                 }
>
>          public void subs(){
>                  String topic        = "MQTT_publisher";
>                 int qos             = 2;
>                 String broker       = <url>;
>                 String clientId     = <ClientID>;
>                 String username   = <username>;
>                 String password   = <password>;
>                // IMqttMessageListener messageListner = null;
>                 MemoryPersistence persistence = new MemoryPersistence();
>
>                  try {
>                     MqttClient sampleClient = new MqttClient(broker,
> clientId,
> persistence);
>                     MqttConnectOptions connOpts = new MqttConnectOptions();
>                     connOpts.setCleanSession(false);
>                     connOpts.setUserName(username);
>                     connOpts.setPassword(password.toCharArray());
>                     System.out.println("Connecting to broker: "+broker);
>                     sampleClient.connect(connOpts);
>                     if(sampleClient.isConnected()==true)
>                     {
>                     System.out.println("Connected");
>                     sampleClient.subscribe(topic, qos);
>
>                     System.out.println("Message subscribed");
>
>                     sampleClient.setCallback(this);
>                     sampleClient.connect();
>                   }
>
>                     sampleClient.disconnect();
>                     System.out.println("Disconnected");
>                 }
>                 catch(MqttException me)
>                 {
>                     System.out.println("reason "+me.getReasonCode());
>                 }
>          }
> }
>
> Both subscriber is having the same properties and ClientID is different for
> both subscriber. Also, only one of them is getting message after broker
> restart everytime and another one is not getting any messages despite of
> being durable subscriber.
>
> When i am logging into activeMQ console and watching under subscriber for
> one which is not getting msg, its Dispatched Queue Size is increasing along
> with Enqueue Counter but Dequeue Counter  is not increasing.
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-
> f2368404.html
>