8

我在 Python 中使用 boto 库来获取 Amazon SQS 消息。在特殊情况下,我不会从队列中删除消息,以便进行更多更改以恢复临时故障。但我不想不断收到失败的消息。我想做的是在收到超过 3 次后删除消息,或者如果接收计数超过 3 则不收到消息。

最优雅的做法是什么?

4

5 回答 5

6

There are at least a couple of ways of doing this.

When you read a message in boto, you receive a Message object or some subclass thereof. The Message object has an "attributes" field that is a dict containing all message attributes known by SQS. One of the things SQS tracks is the approximate # of times the message has been read. So, you could use this value to determine whether the message should be deleted or not but you would have to be comfortable with the "approximate" nature of the value.

Alternatively, you could record message ID's in some sort of database and increment a count field in the database each time you read the message. This could be done in a simple Python dict if the messages are always being read within a single process or it could be done in something like SimpleDB if you need to record readings across processes.

Hope that helps.

Here's some example code:

>>> import boto.sqs
>>> c = boto.sqs.connect_to_region()
>>> q = c.lookup('myqueue')
>>> messages = c.receive_message(q, num_messages=1, attributes='All')
>>> messages[0].attributes
{u'ApproximateFirstReceiveTimestamp': u'1365474374620',
 u'ApproximateReceiveCount': u'2',
 u'SenderId': u'419278470775',
 u'SentTimestamp': u'1365474360357'}
>>>
于 2012-08-11T09:07:11.740 回答
4

其他方式可能是您可以在 SQS 队列中的消息末尾放置一个额外的标识符。这个标识符可以保持消息被阅读的次数的计数。

此外,如果您希望您的服务不应该一次又一次地轮询这些消息,那么您可以再创建一个队列,例如“死消息队列”,然后可以将超过阈值的消息传输到该队列。

于 2012-10-19T10:14:01.227 回答
3

aws 对此具有内置支持,只需按照以下步骤操作:

  1. 创建死信队列
  2. 通过选中“使用重新驱动策略”为源队列启用重新驱动策略
  3. 选择您在步骤#1 中为“死信队列”创建的死信队列
  4. 将“最大接收数”设置为“3”或 1 到 1000 之间的任何值

它的工作原理是,每当工作人员收到消息时,接收计数就会增加。一旦达到“最大接收”计数,消息就会被推送到死信队列。请注意,即使您通过 aws 控制台访问消息,接收计数也会增加。

使用 Amazon SQS 死信队列的来源

于 2016-05-06T18:24:48.070 回答
2

从您阅读的消息中获取 ApproximateReceiveCount 属性。将其移动到另一个队列(而不是您可以管理错误消息)或将其删除。

foreach (var message in response.Messages){
       try{
           var notifyMessage = JsonConvert.DeserializeObject<NotificationMessage>(message.Body);
                    Global.Sqs.DeleteMessageFromQ(message.ReceiptHandle);
           }
       catch (Exception ex){
           var  receiveMessageCount = int.Parse(message.Attributes["ApproximateReceiveCount"]);
           if (receiveMessageCount >3 ) 
              Global.Sqs.DeleteMessageFromQ(message.ReceiptHandle);
            }
        }
于 2015-04-27T17:46:44.273 回答
1

应该分几步完成。

  1. 创建 SQS 连接:- sqsconnrec = SQSConnection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  2. 创建队列对象:- request_q = sqsconnrec.create_queue("queue_Name")
  3. 加载队列消息:-messages=request_q.get_messages()
  4. 现在你得到消息对象的数组并找到消息的总数:- 只需执行 len(messages)

应该像魅力一样工作。

于 2012-08-14T20:57:36.017 回答