我有兴趣了解其他人如何使用官方 RabbitMQ java 客户端库处理从故障连接中恢复。我们正在使用它来将我们的应用程序服务器连接到我们的 RabbitMQ 集群,并且我们已经实现了几种不同的方法来从连接故障中恢复,但没有一种方法感觉完全正确。
想象一下这个伪应用程序:
public class OurClassThatStartsConsumers {
Connection conn;
public void start() {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("someusername");
factory.setPassword("somepassword");
factory.setHost("somehost");
conn = factory.newConnection();
new Thread(new Consumer(conn.createChannel())).start();
}
}
class Consumer1 implements Runnable {
public Consumer1(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
while (true) {
... consume incoming messages on the channel...
// How do we handle that the connection dies?
}
}
}
在现实世界中,我们有数百个消费者。那么如果连接死了怎么办?在上面的例子中,Consumer1 无法恢复,当连接关闭时,Channel 也会关闭,这是我们无法恢复的状态。因此,让我们看一些解决此问题的方法:
解决方案 A)
让每个消费者都有自己的连接,并注册连接断开时触发的事件,然后处理重新连接。
优点:有效
缺点:
- 由于我们有很多消费者,我们可能不想要那么多连接。
- 我们可能有很多重复的代码来重新连接到rabbit并处理重新连接
解决方案 B)
让每个消费者使用相同的连接并订阅它的连接失败事件。
优点:比解决方案 A 中的连接更少
缺点:由于连接已关闭,我们需要重新打开/替换它。java客户端库似乎没有提供重新打开连接的方法,所以我们必须用新连接替换它,然后以某种方式通知所有消费者这个新连接,他们必须重新创建通道和消费者. 再一次,很多我不想在消费者身上看到的逻辑最终都出现了。
解决方案 C)
WrapConnection
和Channel
classes 是处理重新连接逻辑的类,消费者只需要知道WrappedChannel
该类。在连接失败时,WrappedConnection
将处理重新建立连接,一旦连接,WrappedConnection
将自动创建新通道并注册消费者。
优点:它有效——这实际上是我们今天使用的解决方案。
缺点:感觉就像是 hack,我认为底层库应该更优雅地处理它。
也许有更好的方法?API 文档没有太多关于从错误连接中恢复的内容。任何输入表示赞赏:)