2

我使用 Celery 运行抓取一些数据的网络蜘蛛,然后我需要将这些数据保存在数据库中的某个位置(例如 SQLite),但据我所知,我无法在 Celery 工作人员之间共享 SQLAlchemy 会话。你怎么解决这个问题?哪种方式常见?

目前我正在尝试使用 Redis 作为数据的中间存储。

@celery.task
def run_spider(spider, task):
    # setup worker
    logger = logging.getLogger('Spider: %s' % spider.url)
    spider.meta.update({'logger': logger, 'task_id': int(task.id)})

    # push task data inside worker
    spider.meta.update({'task_request': run_spider.request})

    spider.run()

    task.state = "inactive"
    task.resolved = datetime.datetime.now()
    db.session.add(task)
    db.session.commit()

编辑:其实我错了,我不需要共享会话,我需要为每个 celery 进程/任务创建新的数据库连接。

4

2 回答 2

4

我也曾在大型 celery 应用程序中使用 redis 进行持久化。

我的任务通常如下所示:

@task
def MyTask(sink, *args, **kwargs):
    data_store = sharded_redis.ShardedRedis(sink)
    key_helper = helpers.KeyHelper()
    my_dictionary = do_work()
    data_store.hmset(key_helper.key_for_my_hash(), my_dictionary)
  • sharded_redis只是通过客户端处理分片键的几个 redis 分片的抽象。
  • sink是一个(host, port)元组列表,用于在确定分片后建立适当的连接。

本质上,您是通过每个任务连接和断开与 redis 的连接(非常便宜),而不是创建连接池。

使用连接池是可行的,但如果你真的要使用 celery(运行大量并发任务),那么使用这种方法会更好(在我看来),因为你冒着耗尽连接池的风险,尤其是如果您在 redis 中执行的操作需要更长的时间(例如将大型数据集读入内存)。

与 redis 的连接非常便宜,因此应该可以很好地扩展。我们在几个实例上每分钟处理数十万个任务。

于 2012-08-09T04:41:11.450 回答
0

其实我错了,我不需要共享会话,我需要为每个 celery 进程/任务创建新的数据库连接

于 2012-11-17T04:43:44.810 回答