0

我正在尝试在发布消息后如何关闭基于承诺的连接。

我试图推断出我的发送者和接收者的共享代码,所以我有一个这样的连接文件:

连接器.js

const amqp = require('amqplib');
class Connector {
  constructor(RabbitMQUrl) {
    this.rabbitMQUrl = RabbitMQUrl;
  }

  connect() {
    return amqp.connect(this.rabbitMQUrl)
      .then((connection) => {
        this.connection = connection;
        process.once('SIGINT', () => {
          this.connection.close();
        });
        return this.connection.createChannel();
      })
      .catch( (err) => {
        console.error('Errrored here');
        console.error(err);
      });
  }
}

module.exports = new Connector(
  `amqp://${process.env.AMQP_HOST}:5672`
);

然后我的发布者/发件人看起来像这样:

发布者.js

const connector = require('./connector');

class Publisher {
  constructor(exchange, exchangeType) {
    this.exchange = exchange;
    this.exchangeType = exchangeType;
    this.durabilityOptions = {
      durable: true,
      autoDelete: false,
    };
  }

  publish(msg) {
    connector.connect()
      .then( (channel) => {
        let ok = channel.assertExchange(
          this.exchange,
          this.exchangeType,
          this.durabilityOptions
        );
        return ok
          .then( () => {
            channel.publish(this.exchange, '', Buffer.from(msg));
            return channel.close();
          })
          .catch( (err) => {
            console.error(err);
          });
      });
  }
}

module.exports = new Publisher(
  process.env.AMQP_EXCHANGE,
  process.env.AMQP_TOPIC
);

但如前所述,我无法弄清楚如何在调用publish().

4

1 回答 1

0

您可以向连接器添加 close() 函数:

  close() {
      if (this.connection) {
          console.log('Connector: Closing connection..');
          this.connection.close();
      }
  }

出版商:

class Publisher {
  constructor(exchange, exchangeType) {
    this.exchange = exchange;
    this.exchangeType = exchangeType;
    this.durabilityOptions = {
      durable: true,
      autoDelete: false,
    };
  }

  connect() {
      return connector.connect().then( (channel) => {
         console.log('Connecting..');
         return channel.assertExchange(
          this.exchange,
          this.exchangeType,
          this.durabilityOptions
        ).then (() => {  
            this.channel = channel;
            return Promise.resolve();
        }).catch( (err) => {
            console.error(err);
        });;
      });
  }

  disconnect() {
      return this.channel.close().then( () => { return connector.close();});
  }

  publish(msg) {
      this.channel.publish(this.exchange, '', Buffer.from(msg));      
  };

}

测试.js

'use strict'

const connector = require('./connector');
const publisher = require('./publisher');


publisher.connect().then(() => { 
    publisher.publish('message');
    publisher.publish('message2');
    publisher.disconnect();
});
于 2018-01-08T16:14:06.530 回答