-1

我的节点应用程序和RabbitMQ服务器正在docker上运行。
该nodejs应用程序具有重新连接逻辑,当我通过killRabbitMQ容器中的命令优雅地重新启动RabbitMQ服务或sudo docker-compose restart -t 10 rabbitmq等待10秒后再向killRabbitMQ服务发送强制信号时,该逻辑非常有效。

但是当我强行重启 RabbitMQ 服务时,sudo docker-compose restart -t 0 rabbitmq重新连接逻辑不起作用。lsofnodejs 应用程序容器上的命令不显示任何连接。

var amqp = require('amqplib')
var appRoot = require('app-root-path')
var rabbitMQConfig = require(`${appRoot}/config`).rabbitmq
var winston = require(`${appRoot}/logger`)
var utils = require(`${appRoot}/utils`)
var util = require('util')
var rabbitMQController = require(`${appRoot}/controllers/rabbitMQController`)
var request = require('request-promise')

function getRabbitMQConnectionString(rabbitMQConfig) {
    var connPrefix = rabbitMQConfig.prefix
    if (rabbitMQConfig.authEnabled === true) {
        var credentials = rabbitMQConfig.userName + ":" + rabbitMQConfig.password +"@"
        connPrefix += credentials
        connString = util.format(connPrefix +'%s:%s%s', rabbitMQConfig.host, rabbitMQConfig.port, rabbitMQConfig.vhost)
    } else {
        connString = util.format(connPrefix +'%s:%s%s', rabbitMQConfig.host, rabbitMQConfig.port, rabbitMQConfig.vhost)
    }
    return connString   
}

var connString = getRabbitMQConnectionString(rabbitMQConfig)

var initializeRabbitMQ = async function(channel) {
    await channel.assertExchange(rabbitMQConfig.exchange, 'direct', { durable: true })
    await channel.assertQueue(rabbitMQConfig.queue, { durable: true })
    var url = util.format('%s%s:%s/api/bindings/%s/e/%s/q/%s', rabbitMQConfig.httpPrefix, rabbitMQConfig.host, rabbitMQConfig.managementPort, encodeURIComponent(rabbitMQConfig.vhost), rabbitMQConfig.exchange, rabbitMQConfig.queue) 
    var options = {
        "method": "GET",
        "uri": url,
        "headers": {
            "Authorization": "Basic " + new Buffer(rabbitMQConfig.userName + ":" + rabbitMQConfig.password).toString("base64"),
        },
        'json': true,
        'timeout': 15000
    }
    var result = await request(options)
    // console.log('alok', result)
    result.forEach(async function(item) {
        await channel.unbindQueue(rabbitMQConfig.queue, rabbitMQConfig.exchange, item.routing_key)
    })
    rabbitMQConfig.source.forEach(async function(item) {
        await channel.bindQueue(rabbitMQConfig.queue, rabbitMQConfig.exchange, item)
    })
}

var connect = async function() {
    var conn = await amqp.connect(connString)
    var channel =  await conn.createChannel()
    await initializeRabbitMQ(channel)
    rabbitMQController.processMessages()
    return channel
}

var prepareExports = async function(exportsObj) {
    try {
        exportsObj["connection"] = connect()
        var channel = await exportsObj["connection"]
        channel.on('error', function(error) {
            winston.error(utils.getLogString("rabbitmq_channel_error", "", "", error.stack))
        })
        channel.on('close', function() {
            winston.error(utils.getLogString("rabbitmq_channel_close", "", "", ""))
            prepareExports(exportsObj)
        })
    } catch (error) {
        winston.error(utils.getLogString(arguments.callee.name, "", "", error.stack))
        await new Promise((resolve) => {
            setTimeout(resolve, 5000)
        })
        prepareExports(exportsObj)
    }
}

prepareExports(module.exports)

即使在 RabbitMQ 服务强制突然重启的情况下,应该有什么正确的方法让它工作?

4

3 回答 3

0

connection.close()可在http://www.squaremobius.net/amqp.node/channel_api.html#model_close的官方 api 中获得

除了收听频道外,我们还必须收听连接。

var connect = async function() {
    var conn = await amqp.connect(connString)
    var channel =  await conn.createChannel()
    channel.conn_ = conn
    await initializeRabbitMQ(channel)
    // rabbitMQController.processMessages()
    return channel
}

var prepareExports = async function(exportsObj) {
    try {
        exportsObj["connection"] = connect()
        var channel = await exportsObj["connection"]
        var conn = channel.conn_
        channel.on('error', function(error) {
            winston.error(utils.getLogString("rabbitmq_channel_error", "", "", error.stack))
        })
        channel.on('close', function() {
            winston.error(utils.getLogString("rabbitmq_channel_close", "", "", ""))
            prepareExports(exportsObj)
        })
        conn.on('error', function(error) {
            winston.error(utils.getLogString("rabbitmq_conn_error", "", "", error.stack))
        })
        conn.on('close', function() {
            winston.error(utils.getLogString("rabbitmq_conn_close", "", "", ""))
            prepareExports(exportsObj)
        })
    } catch (error) {
        winston.error(utils.getLogString(arguments.callee.name, "", "", error.stack))
        await new Promise((resolve) => {
            setTimeout(resolve, 5000)
        })
        prepareExports(exportsObj)
    }
}

prepareExports(module.exports)
rabbitMQController.processMessages()
于 2019-09-02T07:45:16.637 回答
0

在连接错误时, chennel 中也应该有错误,它没有得到提升,它是amqplib中的一个错误。所以修复是在连接错误时注册一个函数。

var connect = async function() {
    var conn = await amqp.connect(connString)
    var channel =  await conn.createChannel()
    await initializeRabbitMQ(channel)
    conn.on('error', () => {}) // this is solving problem
    return channel
}

var prepareExports = async function(exportsObj) {
    try {
        exportsObj["connection"] = connect()
        var channel = await exportsObj["connection"]
        channel.on('error', function(error) {
            winston.error(utils.getLogString("rabbitmq_channel_error", "", "", error.stack))
        })
        channel.on('close', function() {
            winston.error(utils.getLogString("rabbitmq_channel_close", "", "", ""))
            prepareExports(exportsObj)
        })
    } catch (error) {
        winston.error(utils.getLogString(arguments.callee.name, "", "", error.stack))
        await new Promise((resolve) => {
            setTimeout(resolve, 5000)
        })
        prepareExports(exportsObj)
    }
}

prepareExports(module.exports)
rabbitMQController.processMessages()
于 2019-09-04T07:03:57.733 回答
0

对 RabbitMQ 本地实例的一些测试表明,connection.on('close')当 RabbitMQ 重新启​​动时,总​​是会触发一个侦听器,因此我建议您尝试使用 Connection 侦听器而不是 Channel。

虽然库确实会close在关闭的 Connection 上为所有 Channel 发出事件,但没有理由监听 Channel 事件(与 Connection 事件相反)。此外,监听 Co​​nnection 事件的几个好处:

  1. 如果 Channel 发出error并且没有相应的侦听器,则错误将传播到底层 Connection。
  2. 如果 Connection 发出close,则在错误消息中使用原因调用侦听器。Connectionclose为它的所有 Channel 发出事件而没有指定原因。
于 2019-09-02T04:03:30.940 回答