0

我们的一个 MQTT 订阅者在 Spring 集成中遇到问题(4.0.3.RELEASE 在 Tomcat 7 上运行,带有 Paho MQTT Client 0.4.0)。

问题出在订阅者使用大量使用的主题(大量消息)上。向主题发送消息的设备是通过 GPRS 连接的现场设备。

Spring 集成和代理(Mosquitto)在同一台服务器上运行。

在未重新启动服务器的情况下在 Tomcat 上进行了几次重新部署后,似乎出现了该问题。出现问题时,重新启动 tomcat 实例会修复一段时间。

这是事件链(来自蚊子日志。vdm-dev-live订阅者是有问题的人):

开始 spring 集成时,我们看到所有订阅者都连接到各种主题:

1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-live (c1, k60).
1409645645: Sending CONNACK to vdm-dev-live (0)
1409645645: Received SUBSCRIBE from vdm-dev-live
1409645645:     vdm/+/+/+/liveData (QoS 1)
1409645645: Sending SUBACK to vdm-dev-live
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmReq
1409645645:     vdm/+/+/+/firmware/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-cfgReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-cfgReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-cfgReq
1409645645:     vdm/+/+/+/config/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-cfgReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmStat (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmStat (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmStat
1409645645:     vdm/+/+/firmware/status (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmStat

我们看到消息来回传递

1409645646: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))

我们看到来自各个订阅者的 ping 请求

1409645705: Received PINGREQ from vdm-dev-update
1409645705: Sending PINGRESP to vdm-dev-update
1409645705: Received PINGREQ from vdm-dev-live
1409645705: Sending PINGRESP to vdm-dev-live
1409645705: Received PINGREQ from vdm-dev-fmReq
1409645705: Sending PINGRESP to vdm-dev-fmReq
1409645705: Received PINGREQ from vdm-dev-cfgReq
1409645705: Sending PINGRESP to vdm-dev-cfgReq
1409645705: Received PINGREQ from vdm-dev-fmStat
1409645705: Sending PINGRESP to vdm-dev-fmStat

但突然间,我们看到了这一点:

1409645776: Socket error on client vdm-dev-live, disconnecting.

到那时,订户已经死了。我们没有看到任何 ping 请求,并且它不再处理来自该主题的任何消息。在代理级别上,一切都很好,因为我有调试日志订阅者(使用 NodeJS),我看到这些订阅者仍在处理来自该主题的消息很好(所以问题出在订阅者级别)。

在 tomcat 日志中,我们还可以看到:

Sep 02, 2014 10:16:05 AM org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
SEVERE: vdm-dev-live: Timed out as no activity, keepAlive=60,000 lastOutboundActivity=1,409,645,705,714 lastInboundActivity=1,409,645,755,075

但是 Paho 不会对此订户进行任何清理/重新启动。

我也在 Tomcat 日志中看到了这一点:

SEVERE: The web application [/vdmapp] appears to have started a thread named [MQTT Snd: vdm-dev-live] but has failed to stop it. This is very likely to create a memory leak.

我还注意到该订阅者的许多线程在关机时卡住了。

"MQTT Snd: vdm-dev-live" daemon prio=10 tid=0x00007f1b44781800 nid=0x3061 in Object.wait() [0x00007f1aa7bfa000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1258)
    - locked <0x00000007ab13e218> (a java.lang.Thread)
    at java.lang.Thread.join(Thread.java:1332)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.stop(CommsReceiver.java:77)
    - locked <0x00000007ab552730> (a java.lang.Object)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:294)
    at org.eclipse.paho.client.mqttv3.internal.CommsSender.handleRunException(CommsSender.java:154)
    at org.eclipse.paho.client.mqttv3.internal.CommsSender.run(CommsSender.java:131)
    at java.lang.Thread.run(Thread.java:722)

知道是什么原因造成的以及如何预防吗?

4

2 回答 2

2

跟进我在@Artem 的回答中的评论......

Paho 客户端似乎出现了僵局。请参阅 Gist 的第 573 行;线程正在Snd等待Rec线程终止。在第 586 行,Rec线程被阻塞,因为入站队列已满 (10)。对于所有看起来像这样的情况,没有Call线程。所以队列满的情况永远不会被清除。请注意,在第 227 行,三个线程工作正常(可能是重新部署后重新连接?)。

有了死线,就没有Call线了。

我认为问题出在 Paho 客户端 - 在CommsCallback.run()方法中,有一个 catch on Throwable,它关闭了连接,但由于队列已满,Rec线程永远不会被通知(因此不会清理)。因此,消息传递似乎引发了异常,如果队列已满,则会导致此死锁。

Paho 客户端需要修复,但与此同时,我们可以找出异常是什么。

如果异常是入站网关的下游,您应该会看到一个日志...

        logger.error("Unhandled exception for " + message.toString(), e);

由于此日志是在 中生成的MqttCallback.messageArrived(),如果您没有看到此类错误,则问题可能出在 Paho 客户端本身。

中的异常处理CommsCallback看起来像这样......

        } catch (Throwable ex) {
            // Users code could throw an Error or Exception e.g. in the case
            // of class NoClassDefFoundError
            // @TRACE 714=callback threw exception
            log.fine(className, methodName, "714", null, ex);
            running = false;
            clientComms.shutdownConnection(null, new MqttException(ex));
        }

(这是他们应该调用spaceAvailable.notifyAll()唤醒(垂死)Rec线程的地方)。

因此,为 Paho 客户端打开 FINE 日志记录应该会告诉您异常在哪里/是什么。

于 2014-09-02T19:39:35.763 回答
1

First of all share, please, the versions of Spring Integration and Paho Client.

According to the after doing a couple of redeploys I see this code from CommsReceiver#.stop():

if (!Thread.currentThread().equals(recThread)) {
    try {
        // Wait for the thread to finish.
        recThread.join();
    }
    catch (InterruptedException ex) {
    }
}

Where Thread.join():

* Waits for this thread to die.

I'm really not sure what it means and how it should go further on that wait, but won't the redeploy be a bottleneck to allow for those daemons to continue to live, because the main Thread doesn't die?

于 2014-09-02T10:52:55.787 回答