0

TL;DR

When subscribing to the same topic tree with multiple clients at the same time, not all clients receive the retained messages as expected!

in detail / use case

In a real project several applications subscribe (nearly at the same time because they are started in parallel) to the same MQTT topic (with wildcard). The topic contains about 500 retained messages (each in an own sub topic level) which all applications are expected to receive (they are subscribing with QoS 1).

Beside the "configuration" messages also data topics are subscribed with the same MQTT connection. No persisted state is required (and wanted here). Therefore the application instances connect with cleanSession=true.

For my understanding it would be sufficient if the application instances would each connect with a fixed clientId as cleanSession=true should avoid any state handling. But to be really sure that no state is considered a unique MQTT clientId is generated for each connect.

observerd behavior

Unfortunately not all application instances get the retained messages.. Some get no messages at all from the topic - regardless of how long the subscription lasts. I first thought that the maxInflight (client side) or max_queued_messages (server side) configuration might be the reasons, but after increasing both to 500,000 I guess this is not the reasons behind the failure.

reproduction as test

Therefore I created this github project with a repro. There is a unit test class in the repro MqttSubscriptionTest with the test method multiThreadSubscriptionTest. When executing this test some (1000) retained messages will be published first in the @BeforeClass method. After that, 10 instances of a MqttSubscriber class which implements the IMqttMessageListener and Runnable interface will be instantiated and executed. Each MqttSubscriber instance will be executed in an own thread with an own MqttClient instance and will subscribe to the topic tree with the retained messages. This is logged to the console as follows:

----------- perform subscriptions
Subscriber-3 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-0 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-2 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-4 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-5 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-6 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-1 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-7 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-8 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-9 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'

The test will wait some time and after that validate the subscriptions. It is expected that each Subscriber received the 1000 retained messages:

----------- validate subscriptions
Subscriber-4: receivedMessages=1000; duration=372ms; succeeded=true
Subscriber-0: receivedMessages=1000; duration=265ms; succeeded=true
Subscriber-5: receivedMessages=1000; duration=475ms; succeeded=true
Subscriber-7: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-6: receivedMessages=1000; duration=473ms; succeeded=true
Subscriber-8: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-9: receivedMessages=1000; duration=346ms; succeeded=true
Subscriber-3: receivedMessages=1000; duration=243ms; succeeded=true
Subscriber-1: receivedMessages=1000; duration=470ms; succeeded=true
Subscriber-2: receivedMessages=1000; duration=357ms; succeeded=true

Most subscriber received the expected 1000 messages in a very short time (some hundreds ms). But some (here Subscriber-7/8) did not receive a single message (duration is 0 because they never finished). The situation is not better when giving the subscribers more time to receive the messages. They just won't get them.

I have no idea why this happens. No error messages are shown on the MQTT broker or client side. If you can give any help this would be super useful for me, because I dependent on the reliable delivery of retained messages.

Repro on GitHub: FrVaBe/MQTT/mqtt-client-showcase/

  • I tested with a local EMQ and HiveMQ broker. If you want to run the test you need to run a broker on your machine at localhost:1883 or change the configuration in the test class.
  • I use the Eclipse Paho Java Client MQTT
  • I subscribe with cleanSession=true because I do not want to have any state (the connection is used to subscribe to several topics and the delivery of missed messages is not wanted)
4

2 回答 2

4

HiveMQ 的人很乐意看看这个问题。他们怀疑 Paho 客户端中的原因以及IMqttMessageListener订阅中的使用。有一个描述的问题#432的假定竞争条件。

经验教训:更好地使用MqttCallback而不是IMqttMessageListener

于 2018-02-20T14:33:54.797 回答
0

您可能无法cleanSession(true)在连接期间使用,请参阅此处的说明:http: //www.steves-internet-guide.com/mqtt-retained-messages-example/

我使用蚊子进行测试,它的内部消息队列默认只有 100 个。

HiveMQ 有最大队列消息。

Emq config 有类似的东西http://emqtt.io/docs/v2/config.html#mqtt-message-queue

我修复了 repo 中的示例代码。为我工作。

编辑:刚刚用 Emq ( docker run --rm -ti --name emq -p 18083:18083 -p 1883:1883 quodt/emq-docker:latest) 对其进行了测试,它工作正常。

基本上,它是 cleanSession: 必须是假的。除此之外,您的测试代码中的等待状态很糟糕。它们在我的机器上太短了。使用锁存器或其他真正的同步机制。

于 2018-02-13T22:11:59.647 回答