我试图了解在 RabbitMQ 中合并或分块传入消息的最佳方法(直接使用 Spring AMQP 或 Java 客户端)。
换句话说,我想说 100 条传入消息并将它们组合为 1 并以可靠(正确ACK
编辑的方式)将其重新发送到另一个队列。我相信这被称为 EIP 中的聚合器模式。
我知道 Spring Integration提供了一个聚合器解决方案,但实现看起来不是故障安全的(也就是说,它看起来必须确认并使用消息来构建合并的消息,因此如果你在它这样做时关闭它,你会丢失消息吗? )。
我试图了解在 RabbitMQ 中合并或分块传入消息的最佳方法(直接使用 Spring AMQP 或 Java 客户端)。
换句话说,我想说 100 条传入消息并将它们组合为 1 并以可靠(正确ACK
编辑的方式)将其重新发送到另一个队列。我相信这被称为 EIP 中的聚合器模式。
我知道 Spring Integration提供了一个聚合器解决方案,但实现看起来不是故障安全的(也就是说,它看起来必须确认并使用消息来构建合并的消息,因此如果你在它这样做时关闭它,你会丢失消息吗? )。
我无法直接评论 Spring Integration 库,所以我将笼统地说 RabbitMQ。
如果您不是 100% 相信 Aggregator 的 Spring Integration 实现并且打算自己实现它,那么我建议避免tx
在 RabbitMQ 的底层使用 which uses transactions。
RabbitMQ 中的事务很慢,如果您正在构建高流量/吞吐量系统,您肯定会遇到性能问题。
相反,我建议您查看Publisher Confirms,它是在 RabbitMQ 中实现的 AMQP 的扩展。这是新的http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/的介绍。
您需要调整预取设置以获得正确的性能,请查看http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/了解一些细节。
以上所有内容为您提供了一些背景知识来帮助您解决问题。实现相当简单。
创建消费者时,您需要确保将其设置为需要 ACK。
需要注意的一件事是,如果您的消费者在 3 之后和 4 完成之前死亡,那么那些未 ACK 的消息将在恢复时重新处理
如果将<amqp-inbound-channel-adapter/>
tx-size
属性设置为 100,容器将每 100 条消息确认一次,因此这应该可以防止消息丢失。
但是,您可能希望使聚合消息的发送(在第 100 次接收时)具有事务性,以便您可以在入站消息的 ack 之前确认代理具有该消息。