0

我正在创建一个简单的 NodeJS 应用程序(作为消费者),它接收来自 MQ 代理(Amazon MQ)的消息。我能够接收消息,但是当发布者向代理发送消息时,消费者不会读取新消息(由于没有新消息而空闲几分钟后)。最终,它会断开连接并重新连接,但此时它只会接收新发布的消息。

在此处输入图像描述

在这里您可以看到消息编号 (8,9,12) 没有被读取。这是我的消费者应用程序的问题还是与发布者或经纪人有关?

const mqtt = require('mqtt');
const connect_mqtt = async () => {
  const host = 'SOME_HOST';
  const options = {
    username: 'SOME_USER',
    password: 'SOME_PASSWORD',
    clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8),
    keepalive: 1000
  };

  const client = mqtt.connect(host, options);

  client.on('error', function (error) {
    console.error('Connection Error:', error);
  });

  client.on('disconnect', function (error) {
    console.error('Disconnected', error);
  });

  client.on('close', function (error) {
    console.error('Connection Closed', error);
  });

  client.on('connect', function () {
    console.log('Connecting...');
    client.subscribe('pass-log', function (error) {
      if (!error) {
        console.log('Connected!');
      } else {
        console.error('Subscription Error', error);
      }
    });
  });

  client.on('message', function (topic, message) {
    if (topic === 'pass-log') {
      console.log('Consume', topic, message.toString());
    } else {
      console.log('NOT YOUR TOPIC');
    }
  });
};
connect_mqtt();

从 ActiveMQ 检查,连接处于活动状态 在此处输入图像描述

更新1:

我从发布者那里添加keepalive, clean, reconnectPeriod, qosoptions匹配,现在它似乎可以工作了。我会对此做进一步的观察。

更新2:

当发布者向主题发送消息时,我故意通过退出我的节点应用程序来断开订阅者的连接。当我重新/连接订阅者时,它只读取最后一条消息。

4

0 回答 0