7

我正在寻找一个可靠的实现,以允许我使用Queue逐步处理项目列表。

这个想法是我想使用一定数量的工作人员,这些工作人员将完成 20 多个数据库密集型任务的列表并返回结果。我希望 Python 从前五个项目开始,一旦完成一个任务,就开始队列中的下一个任务。

这就是我目前在没有Threading的情况下这样做的方式。

for key, v in self.sources.iteritems():
    # Do Stuff

我希望有一个类似的方法,但可能不必将列表分成五个子组。这样它就会自动选择列表中的下一个项目。目标是确保如果一个数据库减慢了进程,它不会对整个应用程序产生负面影响。

4

3 回答 3

5

你可以自己实现它,但是 Python 3 已经带有一个Executor基于 的线程管理解决方案,你可以通过安装向后移植的版本在 Python 2.x 中使用它。

您的代码可能看起来像

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_key = {}
    for key, value in sources.items():
        future_to_idday[executor.submit(do_stuff, value)] = key
    for future in concurrent.futures.as_completed(future_to_key):
        key = future_to_key[future]
        result = future.result()
        # process result
于 2013-01-22T13:45:09.623 回答
3

如果您使用的是 python3,我推荐并发期货模块。如果您没有使用 python3 并且没有附加到线程(相对于进程),那么您可以尝试 multiprocessing.Pool (尽管它带有一些警告,并且我在我的应用程序中无法正确关闭池)。如果您必须在 python2 中使用线程,您最终可能会自己编写代码 - 生成 5 个运行消费者函数的线程,并简单地将调用(函数 + args)迭代地推送到队列中,以便消费者找到并处理它们。

于 2013-01-22T13:49:18.513 回答
1

你可以只使用 stdlib 来做到这一点:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # use threads

def db_task(key_value):
    try:
        key, value = key_value
        # compute result..
        return result, None
    except Exception as e:
        return None, e

def main():
    pool = Pool(5)
    for result, error in pool.imap_unordered(db_task, sources.items()):
        if error is None:
            print(result)

if __name__=="__main__":
    main()
于 2013-01-22T14:52:29.583 回答