9

我有一个使用 ec2 实例上的 boto 库的 python 脚本,它是自动缩放组的一部分。该脚本处理来自 SQS 队列的消息:

import boto
from boto.sqs.message import Message

conn = boto.connect_sqs()
q = conn.create_queue('queue-name')

while (qin.count() > 0):
    m = q.get_messages()
    #do something with the message

使用 while 语句有意义吗?count() 是否实时更新为:

  1. 其他实例将消息从队列中取出(或者我要加倍)
  2. 新消息被添加到队列中(或者我会错过它们吗?)

即使队列为空,如何使此脚本不断侦听队列中的新增内容?

在这个问题Processing items in SQS queue with a php script中提到'sqs ruby​​客户端库有一个方法“poll”,它不断地轮询队列,并在队列中接收到消息时将其传递给一个块'。Python中是否有等价物?

也有人建议可以使用 SNS 来通知脚本消息队列状态,但我看不出如何使用 SNS 配置响应式系统,因为指标警报不够精细。

4

3 回答 3

7

您不应该依赖队列的计数,因为它仅用于提供近似计数并且不能保证准确。

如果您想永远保持轮询,只需执行以下操作:

while 1:
    messages = q.get_messages()
    # do something with messages
    time.sleep(N)

我添加了对 time.sleep 的调用以在循环中引入延迟。N 的值应该至少为一秒,并且可能更多,这取决于您期望新消息出现在队列中的速度。如果您不在循环中放置某种延迟,您可能会开始受到服务的限制。

为避免消息被多次读取,您应该尝试将队列的可见性超时调整为大于处理消息所需的时间,然后确保在处理完成后删除消息。

于 2012-06-21T13:06:57.350 回答
5

例子:

# wait_time_seconds count only 1 request in x seconds (0 - 20)
# num_messages get x messages in same request (1 - 10)
while 1:
    logger.info("... waiting messages ...")
    messages = queue_in.get_messages(wait_time_seconds=20, num_messages=10)
    for message in messages:
        logger.info('message: %s' % (message,))
        queue_in.delete_message(message)
于 2016-01-08T19:01:14.657 回答
3
  1. 当您从 SQS 拉出一条消息时,该消息将变得不可见且无法被其他队列查询访问(编辑 - 不可见性可以设置在 0 到 12 小时之间)。
  2. 每次添加新消息时,您都必须再次获取队列,但这应该不是问题——这就是队列服务首先存在的原因。

如果您想不断地轮询队列,请尝试所谓的长轮询- 您可以进行长达 20 秒的连续轮询,当队列被填充时返回。

希望这会有所帮助,否则请查看 boto sqs文档

于 2012-11-14T18:38:35.203 回答