我在 node.js 应用程序上使用 Rascal.Js(它使用 amqplib)作为我的消息传递逻辑和 rabbitMq。
我在我的项目启动中使用了与他们的示例类似的东西,它创建了一个永久实例并“注册”我的所有订阅者并在它们到达队列时重定向消息(在后台)。
我的问题与出版商有关。有来自外部的 http 请求应该会触发我的发布者。用户单击各种创建按钮,这会导致某些操作流。在某些时候,它达到了我需要使用发布者的地步。
在这里,我不确定正确的方法。每次需要发布消息时都需要打开新连接吗?并在结束后关闭它?或者也许我应该以一种为所有发布者保持相同连接打开的方式来实现它?(实际上我不太确定如何以可以从我的应用程序的其他部分访问它的方式创建它)。
目前我正在使用以下内容:
async publishMessage(publisherName, message) {
const dynamicSettings = setupDynamicVariablesFromConfigFiles(minimalPublishSettings);
const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(dynamicSettings.rascal));
broker.on('error', async function(err) {
loggerUtil.writeToLog('error', 'publishMessage() broker_error_event: ' + publisherName + err + err.stack);
await broker.shutdown();
})
const publication = await broker.publish(publisherName, message);
try {
publication.on('error', async function(err) {
loggerUtil.writeToLog('error', 'publishMessage() publish_error_event: ' + err + err.stack);
await broker.shutdown();
}).on("success", async (messageId) => {
await broker.shutdown();
}).on("return", async (message) => {
loggerUtil.writeToLog('error', 'publishMessage() publish_return_event: ' + err + err.stack);
await broker.shutdown();
})
}
catch(err) {
loggerUtil.writeToLog('error', 'Something went wrong ' + err + err.stack);
await broker.shutdown();
}
}
当我需要发布消息时,我会在应用程序的不同部分使用此功能。我想只为所有端点添加 broker.shutdown() 但在发生错误后的某个时候,我收到一个关于关闭已经关闭的连接的异常,这让我担心关闭方法(这可能不是一个好的)。我认为这与此有关-我尝试这样做(注释代码),但我认为它在某些情况下效果不佳。如果一切正常,它会“成功”,然后我可以关闭它。但是有一次我遇到了错误而不是成功,当我尝试使用 broker.shutdown() 时,它给了我另一个导致应用程序崩溃的异常。我认为这与此有关 - https://github.com/squaremo/amqp.node/issues/111
我不确定解决此问题的最安全方法是什么?
编辑:
实际上现在我想起来了,这个异常可能与我试图关闭 catch{} 区域中的代理有关。我会继续调查。