我们在 Scala 中使用 Java RMQ 客户端,我们在 DEV 环境中遇到了一些问题。我们设置了这个后备策略:
def addConnectionShutdownListener(connection: Connection): Unit ={
connection.addShutdownListener { cause: ShutdownSignalException =>
logger.error(s"Error on RMQ connection: ${cause.getMessage}", cause)
if (exitOnFail) {
logger.info("Terminating process with RMQ consumer is shut down")
System.exit(1)
}
else if (retryOnFail) {
logger.info(s"Retrying to connect")
retryCreatingConnection(1)
}
}
}
addConnectionShutdownListener(rmqConnection)
以类似的方式,我添加了通道连接关闭侦听器。
所以我们使用了 2 种策略(并通过配置进行修改)
- 失败退出
- 失败重试
我设置了退出失败策略,有时它可以正常工作。发生错误时,我在日志中看到这一行终止使用 RMQ 使用者的进程已关闭并正确重新启动服务(kubernetes pod 已关闭并再次自动启动)。我禁用了 RMQ 自动恢复,因为它根本不起作用。
问题是有时有些队列没有消费者,消息正在排队并挂起,但日志中没有此错误消息。真的很难测试它,因为我不知道我们的DEV环境发生了什么情况。
会发生什么?有没有更好的方法来处理连接丢失,或者更准确地说 - 处理消费者以某种方式从队列中分离出来的场景?
提前致谢,
阿米尔