我的节点应用程序和RabbitMQ服务器正在docker上运行。
该nodejs应用程序具有重新连接逻辑,当我通过kill
RabbitMQ容器中的命令优雅地重新启动RabbitMQ服务或sudo docker-compose restart -t 10 rabbitmq
等待10秒后再向kill
RabbitMQ服务发送强制信号时,该逻辑非常有效。
但是当我强行重启 RabbitMQ 服务时,sudo docker-compose restart -t 0 rabbitmq
重新连接逻辑不起作用。lsof
nodejs 应用程序容器上的命令不显示任何连接。
var amqp = require('amqplib')
var appRoot = require('app-root-path')
var rabbitMQConfig = require(`${appRoot}/config`).rabbitmq
var winston = require(`${appRoot}/logger`)
var utils = require(`${appRoot}/utils`)
var util = require('util')
var rabbitMQController = require(`${appRoot}/controllers/rabbitMQController`)
var request = require('request-promise')
function getRabbitMQConnectionString(rabbitMQConfig) {
var connPrefix = rabbitMQConfig.prefix
if (rabbitMQConfig.authEnabled === true) {
var credentials = rabbitMQConfig.userName + ":" + rabbitMQConfig.password +"@"
connPrefix += credentials
connString = util.format(connPrefix +'%s:%s%s', rabbitMQConfig.host, rabbitMQConfig.port, rabbitMQConfig.vhost)
} else {
connString = util.format(connPrefix +'%s:%s%s', rabbitMQConfig.host, rabbitMQConfig.port, rabbitMQConfig.vhost)
}
return connString
}
var connString = getRabbitMQConnectionString(rabbitMQConfig)
var initializeRabbitMQ = async function(channel) {
await channel.assertExchange(rabbitMQConfig.exchange, 'direct', { durable: true })
await channel.assertQueue(rabbitMQConfig.queue, { durable: true })
var url = util.format('%s%s:%s/api/bindings/%s/e/%s/q/%s', rabbitMQConfig.httpPrefix, rabbitMQConfig.host, rabbitMQConfig.managementPort, encodeURIComponent(rabbitMQConfig.vhost), rabbitMQConfig.exchange, rabbitMQConfig.queue)
var options = {
"method": "GET",
"uri": url,
"headers": {
"Authorization": "Basic " + new Buffer(rabbitMQConfig.userName + ":" + rabbitMQConfig.password).toString("base64"),
},
'json': true,
'timeout': 15000
}
var result = await request(options)
// console.log('alok', result)
result.forEach(async function(item) {
await channel.unbindQueue(rabbitMQConfig.queue, rabbitMQConfig.exchange, item.routing_key)
})
rabbitMQConfig.source.forEach(async function(item) {
await channel.bindQueue(rabbitMQConfig.queue, rabbitMQConfig.exchange, item)
})
}
var connect = async function() {
var conn = await amqp.connect(connString)
var channel = await conn.createChannel()
await initializeRabbitMQ(channel)
rabbitMQController.processMessages()
return channel
}
var prepareExports = async function(exportsObj) {
try {
exportsObj["connection"] = connect()
var channel = await exportsObj["connection"]
channel.on('error', function(error) {
winston.error(utils.getLogString("rabbitmq_channel_error", "", "", error.stack))
})
channel.on('close', function() {
winston.error(utils.getLogString("rabbitmq_channel_close", "", "", ""))
prepareExports(exportsObj)
})
} catch (error) {
winston.error(utils.getLogString(arguments.callee.name, "", "", error.stack))
await new Promise((resolve) => {
setTimeout(resolve, 5000)
})
prepareExports(exportsObj)
}
}
prepareExports(module.exports)
即使在 RabbitMQ 服务强制突然重启的情况下,应该有什么正确的方法让它工作?