2

SQS 期望您的应用程序是幂等的,并且我有多个消费者/生产者(即使 SQS 有一次交付机制)我将有竞争条件创建重复和竞争条件消耗,因为我的消费者通过 cron 作业运行。

我目前的计划是使用 Django 1.4 select_for_update,它应该阻止同一行的其他消费者,执行以下操作:

reminders = EmailReminder.objects.select_for_update().filter(id=some_id)
if not reminders[0].finished:
    reminder.send()
    reminder.update(finished=datetime.now())
# Delete job.

有没有更好的方法来处理这个问题?

4

1 回答 1

3

将 django-celery 连接到 SQS 并让它使用 celerybeat 指定一个定期作业。然后让 celeryd 工作人员在您想要的任何地方在同一个队列上运行。一次只有一个人会拿起一份工作并执行它。无需在任何级别引入数据库锁定。

只要保证您的工作人员在 celerybeat 触发新工作之前完成其当前任务,您就永远不需要锁。现在,如果您认为它们有可能重叠,您可以为您的通知引入状态:

  1. 任何提醒都以“未发送”状态开始。
  2. 您的 celerybeat 向队列发送处理未发送电子邮件的请求。
  3. 一些工人捡起它并抓住所有这些。
  4. 工作人员立即将它们全部转换为“发送”状态。
  5. 继续一次发送一个(或批量发送)。
  6. 如果任何发送失败,则将其状态恢复为未发送。
  7. 对于所有成功过渡到发送。

这样,如果 celerybeat 在初始批次未完成原始工作时触发另一个工作,您将不会发送重复的电子邮件。作为额外的奖励,您可以扩展解决方案并分配负载。

于 2012-08-04T22:16:00.310 回答