33

我在 SQS 中有多条消息。以下代码总是只返回一个,即使有几十个可见(不在飞行中)。 我认为setMaxNumberOfMessages将允许一次使用多个 .. 我误解了这个吗?

 CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
 String queueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
 ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
 receiveMessageRequest.setMaxNumberOfMessages(10);
 List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
 for (Message message : messages) {
      // i'm a message from SQS
 }

我也试过使用withMaxNumberOfMessages没有任何这样的运气:

 receiveMessageRequest.withMaxNumberOfMessages(10);

我怎么知道队列中有消息?超过 1 个?

 Set<String> attrs = new HashSet<String>();
 attrs.add("ApproximateNumberOfMessages");
 CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
 GetQueueAttributesRequest a = new GetQueueAttributesRequest().withQueueUrl(sqs.createQueue(createQueueRequest).getQueueUrl()).withAttributeNames(attrs);
 Map<String,String> result = sqs.getQueueAttributes(a).getAttributes();
 int num = Integer.parseInt(result.get("ApproximateNumberOfMessages"));

以上总是先运行并给我一个int> 1

感谢您的输入

4

8 回答 8

40

AWS API 参考指南:Query/QueryReceiveMessage

由于队列的分布式特性,在 ReceiveMessage 调用中对一组加权随机机器进行采样。这意味着只返回采样机器上的消息。如果队列中的消息数量很少(少于 1000 条),那么您收到的消息可能会少于每次 ReceiveMessage 调用请求的消息。如果队列中的消息数量非常少,您可能不会在特定的 ReceiveMessage 响应中收到任何消息;在这种情况下,您应该重复请求。

MaxNumberOfMessages:要返回的最大消息数。SQS 永远不会返回比此值更多的消息,但可能会返回更少的消息。

于 2012-03-31T21:34:47.590 回答
10

SQS 参考文档中对这种(可以说是相当特殊的)行为进行了全面的解释。

SQS 将消息副本存储在多个服务器上,并使用两种可能的策略之一向这些服务器发出接收消息请求,

  • 短轮询:默认行为,仅查询服务器的子集基于加权随机分布)。
  • 长轮询:通过将WaitTimeSeconds属性设置为非零值启用,查询所有服务器

实际上,对于我有限的测试,我似乎总是像您一样通过简短的轮询收到一条消息。

于 2014-09-13T22:28:57.253 回答
5

我有同样的问题。您的队列设置的接收消息等待时间是多少?当我的为 0 时,即使队列中有 8 条消息,它也只返回 1 条消息。当我增加接收消息等待时间时,我得到了所有这些。对我来说似乎有点马车。

于 2013-08-31T00:21:18.390 回答
5

我只是在尝试相同的方法,在 setMaxNumberOfMessages 和 setWaitTimeSeconds 这两个属性的帮助下,我能够获得 10 条消息。

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
                      receiveMessageRequest.setMaxNumberOfMessages(10);
                      receiveMessageRequest.setWaitTimeSeconds(20);

o / p的快照:

Receiving messages from TestQueue.
Number of messages:10
Message
MessageId:     31a7c669-1f0c-4bf1-b18b-c7fa31f4e82d 
...
于 2017-07-26T09:40:55.760 回答
1

receiveMessageRequest.withMaxNumberOfMessages(10);

为了清楚起见,更实际的用途是像这样添加到您的构造函数中:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10);

否则,您不妨这样做:

receiveMessageRequest.setMaxNumberOfMessages(10);

话虽如此,改变它不会帮助原来的问题。

于 2012-06-15T17:55:36.437 回答
1

感谢考伊特!

我也遇到了这个问题。最后通过使用长轮询解决了这里的配置: https ://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-long-polling-for-queue.html

不幸的是,要使用长轮询,您必须将队列创建为 FIFO 队列。我尝试了标准队列但没有运气。

并且在接收时,还需要设置 MaxNumberOfMessages。所以我的代码是这样的:

ReceiveMessageRequest receive_request = new ReceiveMessageRequest() .withQueueUrl(QUEUE_URL) .withWaitTimeSeconds(20) .withMaxNumberOfMessages(10);

虽然解决了,但还是觉得太有线了。AWS 绝对应该为这种基本的接收操作提供更简洁的 API。

在我看来,AWS 有很多很酷的特性,但没有好的 API。就像那些家伙一直在冲出去一样。

于 2019-03-13T18:54:22.913 回答
1

对于小任务列表,我使用像stackoverflow.com/a/55149351/13678017这样的FIFO 队列 ,例如修改后的AWS 教程

            // Create a queue.
        System.out.println("Creating a new Amazon SQS FIFO queue called " + "MyFifoQueue.fifo.\n");
        final Map<String, String> attributes = new HashMap<>();

        // A FIFO queue must have the FifoQueue attribute set to true.
        attributes.put("FifoQueue", "true");
        /*
         * If the user doesn't provide a MessageDeduplicationId, generate a
         * MessageDeduplicationId based on the content.
         */
        attributes.put("ContentBasedDeduplication", "true");
        // The FIFO queue name must end with the .fifo suffix.
        final CreateQueueRequest createQueueRequest = new CreateQueueRequest("MyFifoQueue4.fifo")
                        .withAttributes(attributes);
        final String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();

        // List all queues.
        System.out.println("Listing all queues in your account.\n");
        for (final String queueUrl : sqs.listQueues().getQueueUrls()) {
            System.out.println("  QueueUrl: " + queueUrl);
        }
        System.out.println();

        // Send a message.
        System.out.println("Sending a message to MyQueue.\n");

        for (int i = 0; i < 4; i++) {

            var request = new SendMessageRequest()
                    .withQueueUrl(myQueueUrl)
                    .withMessageBody("message " + i)
                    .withMessageGroupId("userId1");
                    ;

            sqs.sendMessage(request);
        }

        for (int i = 0; i < 6; i++) {

            var request = new SendMessageRequest()
                    .withQueueUrl(myQueueUrl)
                    .withMessageBody("message " + i)
                    .withMessageGroupId("userId2");
                    ;

            sqs.sendMessage(request);
        }

        // Receive messages.
        System.out.println("Receiving messages from MyQueue.\n");
        var receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);

        receiveMessageRequest.setMaxNumberOfMessages(10);
        receiveMessageRequest.setWaitTimeSeconds(20);

        // what receive?
        receiveMessageRequest.withMessageAttributeNames("userId2");


        final List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
        for (final Message message : messages) {
            System.out.println("Message");
            System.out.println("  MessageId:     "
                    + message.getMessageId());
            System.out.println("  ReceiptHandle: "
                    + message.getReceiptHandle());
            System.out.println("  MD5OfBody:     "
                    + message.getMD5OfBody());
            System.out.println("  Body:          "
                    + message.getBody());
            for (final Entry<String, String> entry : message.getAttributes()
                    .entrySet()) {
                System.out.println("Attribute");
                System.out.println("  Name:  " + entry
                        .getKey());
                System.out.println("  Value: " + entry
                        .getValue());
            }
        }
于 2020-06-04T07:27:30.033 回答
0

这是一个解决方法,您可以异步调用 receiveMessageFromSQS 方法。

   bulkReceiveFromSQS (queueUrl, totalMessages, asyncLimit, batchSize, visibilityTimeout, waitTime, callback) {
    batchSize = Math.min(batchSize, 10);

    let self = this,
        noOfIterations = Math.ceil(totalMessages / batchSize);

    async.timesLimit(noOfIterations, asyncLimit, function(n, next) {
        self.receiveMessageFromSQS(queueUrl, batchSize, visibilityTimeout, waitTime,
            function(err, result) {
                if (err) {
                    return next(err);
                }

                return next(null, _.get(result, 'Messages'));
            });
    }, function (err, listOfMessages) {
        if (err) {
            return callback(err);
        }
        listOfMessages = _.flatten(listOfMessages).filter(Boolean);

        return callback(null, listOfMessages);
    });
}

它将返回一个包含给定消息数量的数组

于 2020-03-18T03:29:33.857 回答