我有一个生产者向 RabbitMQ 交换发送持久消息。如果 RabbitMQ 内存或磁盘超过 watermark 阈值,RabbitMQ 将阻塞我的生产者。文档说它停止从套接字读取,并暂停心跳。
我想要的是一种在我的生产者代码中知道我被阻止的方法。目前,即使启用了心跳,一切都会永远暂停。我想收到某种异常,以便我知道我已被阻止,我可以警告用户和/或采取其他措施,但我找不到任何方法来做到这一点。我同时使用 Java 和 C# 客户端,并且两者都需要此功能。有什么建议吗?谢谢。
我有一个生产者向 RabbitMQ 交换发送持久消息。如果 RabbitMQ 内存或磁盘超过 watermark 阈值,RabbitMQ 将阻塞我的生产者。文档说它停止从套接字读取,并暂停心跳。
我想要的是一种在我的生产者代码中知道我被阻止的方法。目前,即使启用了心跳,一切都会永远暂停。我想收到某种异常,以便我知道我已被阻止,我可以警告用户和/或采取其他措施,但我找不到任何方法来做到这一点。我同时使用 Java 和 C# 客户端,并且两者都需要此功能。有什么建议吗?谢谢。
很抱歉告诉你,但是使用 RabbitMQ(至少使用 2.8.6)这是不可能的 :-(
有一个类似的问题,其中心是在连接被阻止时尝试建立一个通道。结果和你所经历的一样。
我对 RabbitMQ C# .Net 库的实际核心进行了一些调查,发现问题的根本原因是它进入了无限阻塞状态。
您可以在此处查看有关 RabbitMQ 邮件列表的更多详细信息:
一个建议(我们没有实施)是在线程内部完成工作,并让其他一些组件管理超时并在超过时终止线程。我们只是接受了风险:-(
Rabbitmq 使用阻塞 rpc 调用,无限期地监听回复。
如果您查看 Java 客户端 api,它的作用是:
AMQChannel.BlockingRpcContinuation k = new AMQChannel.SimpleBlockingRpcContinuation();
k.getReply(-1);
现在 -1 在参数块中传递,直到收到回复。
好消息是您可以通过超时以使其返回。坏事是您将不得不更新客户端 jars。
如果您可以这样做,则可以在进行上述阻塞调用的任何地方传递超时。代码看起来像:
try {
return k.getReply(200);
} catch (TimeoutException e) {
throw new MyCustomRuntimeorTimeoutException("RabbitTimeout ex",e);
}
在您的代码中,您可以处理此异常并在此事件中执行您的逻辑。
可能需要此修复的一些相关类是:
com.rabbitmq.client.impl.AMQChannel
com.rabbitmq.client.impl.ChannelN
com.rabbitmq.client.impl.AMQConnection
仅供参考:我已经尝试过了,它有效。