6

我有以下代码从 SQS 队列中检索消息。我正在使用 aAmazonSQSBufferedAsyncClient从队列中检索消息。每 5 分钟调用一次固定延迟SingleThreadedExecutor唤醒receiveMessage。队列中启用长轮询

@Service
public class AmazonQueueService
    implements QueueService<String> {

    @Autowired
    private AmazonSQSBufferedAsyncClient sqsAsyncClient;

    @Value("${aws.sqs.queueUrl}")
    private String queueUrl;

    @Override
    public List<Message<String>> receiveMessage() {
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
        ReceiveMessageResult result = sqsAsyncClient.receiveMessage(receiveMessageRequest);

        LOG.debug("Size=" + result.getMessages().size());

        return Lists.transform(result.getMessages(), ......);
    }

    .....
}

问题是当我检查 AWS 控制台时,消息总是在进行中,但应用程序中从未收到它(大小总是打印为 0)。看起来AmazonSQSBufferedAsyncClient正在从队列中读取消息,但没有在receiveMessage调用中返回。

有任何想法吗?

4

3 回答 3

5

终于想通了。问题体现在队列可见性超时(2 分钟)和 scheduleExecutor 延迟(5 分钟)的组合中。

将可见性超时增加到 15 分钟解决了这个问题。

我的理论是-> AmazonSQSBufferedAsyncClient 检索消息并将其保存在缓冲区中等待接收消息调用。由于执行器延迟为 5 分钟,因此在调用 receiveMessage 并将消息返回到队列之前,消息的可见性会超时。看起来消息几乎是立即从队列中挑选出来的。现在,无论出于何种原因,对 receiveMessage 的调用都没有收到消息。我猜,增加超时让 receiveMessage 调用有机会在超时事件之前发生,解决了这个问题。

还有其他可能的解释吗?

于 2013-08-16T15:45:16.713 回答
3

对我来说,问题是我的消息甚至在我在控制台上看到它们之前就被 lambda 读取了。所以结果是我无法从 sqs 控制台轮询消息。

于 2020-06-05T00:21:05.613 回答
2

完成后,您必须从队列中删除该消息。如果您不这样做,它将一直在飞行中,直到超时,然后再回到队列中。它是这样设计的,因此您永远不会丢失消息。如果您的程序在完成处理消息并删除它之前崩溃,则消息将立即返回队列。

从基本的Java 示例( SampleDriver.java):

QMessage message = messages.get(0);

System.out.println("\nMessage received");
System.out.println("  message id:     " + message.getId());
System.out.println("  receipt handle: " + message.getReceiptHandle());
System.out.println(" message content: " + message.getContent());

testQueue.deleteMessage(message.getReceiptHandle()); // <===== here
于 2013-08-16T02:02:23.047 回答