我正在创建一个简单的 NodeJS 应用程序(作为消费者),它接收来自 MQ 代理(Amazon MQ)的消息。我能够接收消息,但是当发布者向代理发送消息时,消费者不会读取新消息(由于没有新消息而空闲几分钟后)。最终,它会断开连接并重新连接,但此时它只会接收新发布的消息。
在这里您可以看到消息编号 (8,9,12) 没有被读取。这是我的消费者应用程序的问题还是与发布者或经纪人有关?
const mqtt = require('mqtt');
const connect_mqtt = async () => {
const host = 'SOME_HOST';
const options = {
username: 'SOME_USER',
password: 'SOME_PASSWORD',
clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8),
keepalive: 1000
};
const client = mqtt.connect(host, options);
client.on('error', function (error) {
console.error('Connection Error:', error);
});
client.on('disconnect', function (error) {
console.error('Disconnected', error);
});
client.on('close', function (error) {
console.error('Connection Closed', error);
});
client.on('connect', function () {
console.log('Connecting...');
client.subscribe('pass-log', function (error) {
if (!error) {
console.log('Connected!');
} else {
console.error('Subscription Error', error);
}
});
});
client.on('message', function (topic, message) {
if (topic === 'pass-log') {
console.log('Consume', topic, message.toString());
} else {
console.log('NOT YOUR TOPIC');
}
});
};
connect_mqtt();
更新1:
我从发布者那里添加keepalive, clean, reconnectPeriod, qos
了options
匹配,现在它似乎可以工作了。我会对此做进一步的观察。
更新2:
当发布者向主题发送消息时,我故意通过退出我的节点应用程序来断开订阅者的连接。当我重新/连接订阅者时,它只读取最后一条消息。