2

我试图了解在 RabbitMQ 中合并或分块传入消息的最佳方法(直接使用 Spring AMQP 或 Java 客户端)。

换句话说,我想说 100 条传入消息并将它们组合为 1 并以可靠(正确ACK编辑的方式)将其重新发送到另一个队列。我相信这被称为 EIP 中的聚合器模式。

我知道 Spring Integration提供了一个聚合器解决方案,但实现看起来不是故障安全的(也就是说,它看起来必须确认并使用消息来构建合并的消息,因此如果你在它这样做时关闭它,你会丢失消息吗? )。

4

2 回答 2

3

我无法直接评论 Spring Integration 库,所以我将笼统地说 RabbitMQ。

如果您不是 100% 相信 Aggregator 的 Spring Integration 实现并且打算自己实现它,那么我建议避免tx在 RabbitMQ 的底层使用 which uses transactions。

RabbitMQ 中的事务很慢,如果您正在构建高流量/吞吐量系统,您肯定会遇到性能问题。

相反,我建议您查看Publisher Confirms,它是在 RabbitMQ 中实现的 AMQP 的扩展。这是新的http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/的介绍。

您需要调整预取设置以获得正确的性能,请查看http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/了解一些细节。

以上所有内容为您提供了一些背景知识来帮助您解决问题。实现相当简单。

创建消费者时,您需要确保将其设置为需要 ACK。

  1. 出列 n 条消息,当您出列时,您需要记下每条消息的 DeliveryTag(这用于确认消息)
  2. 将消息聚合成新消息
  3. 发布新消息
  4. 确认每个出队的消息

需要注意的一件事是,如果您的消费者在 3 之后和 4 完成之前死亡,那么那些未 ACK 的消息将在恢复时重新处理

于 2013-02-26T09:49:07.017 回答
2

如果将<amqp-inbound-channel-adapter/> tx-size属性设置为 100,容器将每 100 条消息确认一次,因此这应该可以防止消息丢失。

但是,您可能希望使聚合消息的发送(在第 100 次接收时)具有事务性,以便您可以在入站消息的 ack 之前确认代理具有该消息。

于 2013-02-25T16:45:19.150 回答