1

我有一个生产者向 RabbitMQ 交换发送持久消息。如果 RabbitMQ 内存或磁盘超过 watermark 阈值,RabbitMQ 将阻塞我的生产者。文档说它停止从套接字读取,并暂停心跳。

我想要的是一种在我的生产者代码中知道我被阻止的方法。目前,即使启用了心跳,一切都会永远暂停。我想收到某种异常,以便我知道我已被阻止,我可以警告用户和/或采取其他措施,但我找不到任何方法来做到这一点。我同时使用 Java 和 C# 客户端,并且两者都需要此功能。有什么建议吗?谢谢。

4

2 回答 2

0

很抱歉告诉你,但是使用 RabbitMQ(至少使用 2.8.6)这是不可能的 :-(

有一个类似的问题,其中心是在连接被阻止时尝试建立一个通道。结果和你所经历的一样。

我对 RabbitMQ C# .Net 库的实际核心进行了一些调查,发现问题的根本原因是它进入了无限阻塞状态。

您可以在此处查看有关 RabbitMQ 邮件列表的更多详细信息:

http://rabbitmq.1065348.n5.nabble.com/Net-Client-locks-trying-to-create-a-channel-on-a-blocked-connection-td21588.html

一个建议(我们没有实施)是在线程内部完成工作,并让其他一些组件管理超时并在超过时终止线程。我们只是接受了风险:-(

于 2013-01-18T13:15:29.060 回答
0

Rabbitmq 使用阻塞 rpc 调用,无限期地监听回复。

如果您查看 Java 客户端 api,它的作用是:

    AMQChannel.BlockingRpcContinuation k = new AMQChannel.SimpleBlockingRpcContinuation();
    k.getReply(-1);

现在 -1 在参数块中传递,直到收到回复。

好消息是您可以通过超时以使其返回。坏事是您将不得不更新客户端 jars。

如果您可以这样做,则可以在进行上述阻塞调用的任何地方传递超时。代码看起来像:

try {
                    return k.getReply(200);
                } catch (TimeoutException e) {
                    throw new MyCustomRuntimeorTimeoutException("RabbitTimeout ex",e);
                }

在您的代码中,您可以处理此异常并在此事件中执行您的逻辑。

可能需要此修复的一些相关类是:

com.rabbitmq.client.impl.AMQChannel
com.rabbitmq.client.impl.ChannelN
com.rabbitmq.client.impl.AMQConnection

仅供参考:我已经尝试过了,它有效。

于 2013-06-14T08:16:05.380 回答