1

我在 EC2 上运行了一个带有 2 个使用 2 个不同 SQS 队列的不同 python 脚本的盒子。我的脚本在上周运行得非常好,突然之间其中一个脚本需要很长时间才能从 SQS 队列中提取消息。即使它设法拉出消息,它也会拉出很多重复的消息。我的 2 个脚本使用相同的方法:

def getMessages(self, n):
    '''returns n messages erasing them, mutliple of 10'''
    msgs=[]
    rs=self.queue.get_messages(10)
    if len(rs) < 1:
        return []
    count = 0
    while count < n or len(rs) < 10:
        msgs.extend(rs)
        rs=self.queue.get_messages(10)
        count += 10
    # delete pulled messages
    for msg in msgs:
        self.queue.delete_message(msg)

    return msgs

在这种情况下 self.queue 是以下实例:

    self.conn = SQSConnection(SQS_ACCESS_KEY, SQS_SECRET_KEY)
    self.queue = self.conn.get_queue(queue_name)

我的脚本通过以下方式使用 getMessages() 方法:

while True:
    print 'Getting SQS messages'
    messages = sqs.getMessages(50)
    print 'Got %s messages' % len(messages)

    for msg in messages:
        do_something(msg)

    print 'Going to sleep'
    time.sleep(sleep_time)

现在,我的脚本需要很长时间才能提取消息,它会打印出来Getting SQS messages,有时会在那里停留数小时。更奇怪的是,如果我打开一个 ipython shell 并getMessages(50)运行得非常快,而我的其他脚本的工作方式完全相同,但在不同的队列上,我对此完全没有问题。

这里可能有什么问题,谢谢。

4

1 回答 1

3

您在 while 循环中的 getMessages 函数中有一些错误:
1. 使用count += len(rs)
2. 主要错误:您不需要

or len(rs) < 10

条件,因为 SQS 最多可以获取 10 条消息,如果您的队列已满,并且每次收到 10 条消息,您将有一个无限循环。

另外我认为您应该考虑使用yield语句来返回消息,处理它们并在处理它们之后,然后才将它们从队列中删除。

还有一件事,pythonic的检查方法if len(rs) < 1:if not rs:

于 2014-11-18T08:12:53.983 回答