Quantcast

Experiencing constant reconnects with paho client v1.1.1 and broker Active MQ 5.14.3

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Experiencing constant reconnects with paho client v1.1.1 and broker Active MQ 5.14.3

MartinEden
Hi everyone,

I am experiencing constant reconnects from my paho MQTT subscriber and
publisher clients running inside a JVM on my Mac talking to a Active MQ
instance running in Virtual BOX also on my Mac.

The protocol is MQTT.

It is an integration test where I am trying to test a simple app that runs
two paho mqtt clients: a subscriber and a publisher. The subscriber
subscribes to a topic, reads the messages via callback, processes them and
then passes them to the publisher that will publish them to another topic.
So this main app has 2 paho clients, separately instantiated like so (Scala
code):

subscriber:
Code: [Select all <https://www.eclipse.org/forums/#>] [Show/ hide
<https://www.eclipse.org/forums/#>]

    val mqttClient = new MqttClient(brokerUrl, "sub", new
MqttDefaultFilePersistence())
    val options = new MqttConnectOptions()
    options.setCleanSession(false)
    options.setAutomaticReconnect(true)
    client.setCallback(callback)
    client.connect(options)
    client.subscribe(topic, 1)



publisher:
Code: [Select all <https://www.eclipse.org/forums/#>] [Show/ hide
<https://www.eclipse.org/forums/#>]

    val mqttClient = new MqttClient(brokerUrl, "pub", new MemoryPersistence())
    val options = new MqttConnectOptions()
    options.setMaxInflight(maxInFlight)
    options.setAutomaticReconnect(true)
    mqttClient.connect(options)
    mqttClient.publish(key, new MqttMessage(value))



The point of the integration test is to verify that by enabling persistent
session on the subscriber client (options.setCleanSession(false) and new
MqttDefaultFilePersistence() above), we can consume the messages published
to the topic during app downtime.

The test is simple:
- start main app.
- use a 3rd paho client (publisher) that publishes 1 message to the input
topic.
- use a 4th paho client (subscriber) to check the expected message was
published back to the output topic by the main app.
- kill app
- use the 3rd publisher paho client to publish 1 more message in the input
topic.
- start app.
- use the 4th paho client (subscriber) to check the expected message was
published back to the output topic by the main app.

The 3rd and 4th clients are instantiated by the test like so:
val mqttClient = new MqttClient(mqttBrokerUrl, MqttClient.generateClientId,
new MemoryPersistence)

What happens is that all the messages get through as expected BUT the
subscriber paho client, after the second start of the main app,
continuously spits out:

Code: [Select all <https://www.eclipse.org/forums/#>] [Show/ hide
<https://www.eclipse.org/forums/#>]

2017-04-27 22:59:55 ERROR MQTTSource:64 - Connection lost for
MqttClient with id [sub]; going to reconnect
Connection lost (32109) - java.io.EOFException
        at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:164)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
        at java.io.DataInputStream.readByte(DataInputStream.java:267)
        at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
        at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:116)
        ... 1 more



The first time the app is started this does not happen! Only the second
time!

Doing some digging around it seems people suggest this happens when you
have two clients with the same id. This is not the case as all the ids are
different. This is shown also in the Active MQ broker UI:
Name Remote Address Active Slow
paho1493328011500000000 tcp://192.168.99.1:58421 true false
sub tcp://192.168.99.1:59426 true false
pub tcp://192.168.99.1:59427 true false
paho1493328011993000000 tcp://192.168.99.1:58424 true false

"sub" and "pub" being the ids for the paho clients in main app.
"paho1493328011500000000" and "paho1493328011993000000" being the randomly
generated ids for the paho clients 3 and 4 in the test.

Furthermore, the Active MQ server logs continuously print lines like these:
Code: [Select all <https://www.eclipse.org/forums/#>] [Show/ hide
<https://www.eclipse.org/forums/#>]

2017-04-27 21:44:48,487 | WARN  | Stealing link for clientId sub From
Connection Transport Connection to: tcp://192.168.99.1:61766 |
org.apache.activemq.broker.region.RegionBroker | ActiveMQ Transport:
tcp:///192.168.99.1:61768@1883
2017-04-27 21:44:48,832 | WARN  | Stealing link for clientId pub From
Connection Transport Connection to: tcp://192.168.99.1:61767 |
org.apache.activemq.broker.region.RegionBroker | ActiveMQ Transport:
tcp:///192.168.99.1:61769@1883
2017-04-27 21:44:49,497 | WARN  | Stealing link for clientId sub From
Connection Transport Connection to: tcp://192.168.99.1:61768 |
org.apache.activemq.broker.region.RegionBroker
...



So it seems there is constant link stealing between the paho clients in the
main app with ids "sub" and "pub".

Can anyone shed light on this behavior? Why is it happening? It does not
seem normal. Why does it happen if the ids are different? How can I prevent
it?

Sorry for the long message. Any help is much appreciated.

M
Loading...