13

我打算使用 Celery 来处理由我的主服务器的事件触发的发送推送通知和电子邮件。

这些任务需要打开与外部服务器(GCM、APS、电子邮件服务器等)的连接。它们可以一次处理一个,也可以通过单个连接批量处理以获得更好的性能。

通常会在短时间内分别触发多个这些任务的实例。例如,在一分钟内,可能有几十个推送通知需要向不同的用户发送不同的消息。

在 Celery 中处理这个问题的最佳方法是什么?似乎天真的方法是对每条消息简单地执行不同的任务,但这需要为每个实例打开一个连接。

我希望会有某种任务聚合器允许我处理例如“所有未完成的推送通知任务”。

这样的事情存在吗?有没有更好的方法来解决它,例如附加到活动任务组?

我错过了什么吗?

罗伯特

4

2 回答 2

9

我最近在我的项目中发现并实现了该celery.contrib.batches模块。在我看来,这是一个比 Tommaso 的答案更好的解决方案,因为您不需要额外的存储层。

这是直接来自文档的示例:

每 100 条消息或每 10 秒刷新一次缓冲区的点击计数器。不对数据做任何事情,但可以轻松修改以将其存储在数据库中。

# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
    from collections import Counter
    count = Counter(request.kwargs['url'] for request in requests)
    for url, count in count.items():
        print('>>> Clicks: {0} -> {1}'.format(url, count))

不过要小心,它适合我的使用,但它在文档中提到这是一个“实验任务类”。这可能会阻止某些人使用具有这种易变描述的功能:)

于 2013-11-11T09:49:38.037 回答
4

实现此目的的一种简单方法是在持久存储(例如数据库)上编写任务应执行的所有操作,并让定期作业在一批中执行实际过程(使用单个连接)。注意:确保您有一些锁定以防止队列被处理两次!

有一个很好的例子说明如何在kombu级别做类似的事情(http://ask.github.com/celery/tutorials/clickcounter.html)

就我个人而言,我喜欢哨兵在数据库级别(sentry.buffers 模块)做这样的事情来批量增量的方式

于 2012-10-03T09:45:43.913 回答