0

我使用 RPC 端点,其中一个端点存在以下问题:我没有收到消息,因此回调函数没有在 channel.consume() 上执行。

在那个端点,我发送一条消息,一个需要时间的进程正在服务器端运行,它用一条关于进程是否正确执行的消息来响应我。在服务器立即发送消息的其他端点没有问题。

我认为超时存在问题。我试图将对象 {timeout: 3600000} 放在 amqpOptions 之后,但问题还是没有解决。具体来说,无论我添加什么对象,连接和通道对象都具有相同的参数。如何正确更改超时?

const amqp = require('amqplib/callback_api');
const amqpOptions = {
  protocol: 'amqp',
  hostname: process.env.RABBITMQ_HOST,
  port: process.env.RABBITMQ_PORT,
  username: process.env.RABBITMQ_USER,
  password: process.env.RABBITMQ_PASS,
  vhost: '/',
};
const message = Buffer.from(JSON.stringify({}));
      amqp.connect(amqpOptions, (error0, connection) => {
        if (error0) { throw error0; }
        connection.createChannel((error1, channel) => {
          if (error1) { throw error1; }
          const correlationId = generateUuid();
          channel.consume(replyQueue, (msg) => {
            if (JSON.parse(msg.content).error) {
              console.log(JSON.parse(msg.content));
              const error = JSON.parse(msg.content.toString());
              return next(error);
            } 
            console.log(JSON.parse(msg.content));
            console.log('msg:',msg);
            const {tunnel_info} = JSON.parse(msg.content.toString());
          }, {noAck: true});
          channel.sendToQueue(`${brokerUri}`,
            message, {correlationId, contentType: 'application/json', contentEncoding: 'utf8', replyTo: replyQueue});
        });
      });
4

1 回答 1

0

因为通道是单向的。您应该为发布和消费使用两个不同的渠道。

AMQP 规范说:

通道是unidirectional,因此在每个连接端点,传入和传出通道是完全不同的。

于 2020-11-19T06:12:53.713 回答