4

Spring Cloud AWS(1.0.0.RC2)中SimpleMessageListenerContainer类的当前实现似乎在消息处理程序完成处理消息并且方法调用返回后自动删除消息。

在我们的应用程序中,我们需要能够处理消息并等待来自下游队列的异步确认,然后再从 SQS 上游队列中删除消息。就像是

接收 SQS msg -> Process msg -> Publish msg to RabbitMQ(线程在这里完成)

删除 SQS 消息 <- 我们的应用程序 <- RabbitMQ 消息成功 Ack(异步)

由于 msg ack 通过不同的线程异步返回,一旦我们检查了成功 ack,我们需要从 SQS 中手动删除 msg 的选项。

理想情况下,SimpleMessageListener 应该可配置为它在哪种模式下运行(自动删除或手动删除)。

我们非常希望使用 spring aws cloud lib(而不是推出我们自己的)与 SQS 集成,因为它已经处理了侦听器容器 bean 生命周期管理。

请让我知道上述建议的功能是否可行,如果可行,何时可以实施和发布。

谢谢。

4

1 回答 1

2

我们可以再添加一个标志(除了已经存在的deleteMessageOnException标志之外)以完全禁用消息的自动删除,即使在成功处理时也是如此。我看到的问题是不再处理有毒消息并且可能会炸毁队列。我在这里创建了一个问题。

你的方法会有另一个问题。如果消息没有被足够快地删除(基于可见性超时),它将再次出现在您的处理程序方法中。

接收 SQS msg1 -> 处理 msg1 -> 将 msg1 发布到 RabbitMQ(线程在此处完成)

接收 SQS msg2 -> 处理 msg2 -> 将 msg2 发布到 RabbitMQ(线程在此处完成)

接收SQS msg1 -> Process msg1 -> Publish msg1 to RabbitMQ msg1 又来了,因为没有删除

删除 SQS msg1 <- 我们的应用程序 <- RabbitMQ Msg1 成功 Ack(异步)

目前一个非常难看的解决方法可能是在处理程序方法中抛出异常并将deleteMessageOnException标志设置为false。因此不会删除任何消息,您可以获取收据句柄(使用@Header@Headers)手动删除它们。

编辑

该问题现已修复,可以直接使用@SqsListener注释定义删除策略并使用注入的 Acknowledgement 对象。请参阅有关此问题的最后评论

于 2015-02-10T09:58:31.067 回答