5

如何限制 Python 中的并发线程数?

例如,我有一个包含许多文件的目录,我想处理所有文件,但一次只能并行处理 4 个。

这是我到目前为止所拥有的:

def process_file(fname):
        # open file and do something                                                                                            

def process_file_thread(queue, fname):
    queue.put(process_file(fname))

def process_all_files(d):
    files=glob.glob(d + '/*')
    q=Queue.Queue()
    for fname in files:
        t=threading.Thread(target=process_file_thread, args=(q, fname))
        t.start()
    q.join()

def main():
    process_all_files('.')
    # Do something after all files have been processed

如何修改代码以便一次只运行 4 个线程?

请注意,我想等待所有文件都被处理,然后继续处理已处理的文件。

4

2 回答 2

9

例如,我有一个包含许多文件的目录,我想处理所有文件,但一次只能并行处理 4 个。

这正是线程池所做的:您创建作业,并且该池一次并行运行 4 个。您可以通过使用执行程序使事情变得更简单,您只需将函数(或其他可调用对象)交给它,它就会将结果交还给您。您可以自己构建所有这些,但您不必这样做。*

stdlib 的concurrent.futures模块是执行此操作的最简单方法。(对于 Python 3.1 及更早版本,请参阅backport。)事实上,其中一个主要示例与您想要做的非常接近。但是,让我们根据您的确切用例对其进行调整:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        concurrent.futures.wait(fs)

如果您想process_file退货,这几乎同样简单:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        for f in concurrent.futures.as_completed(fs):
            do_something(f.result())

如果你也想处理异常……好吧,看看这个例子;它只是一个try/except围绕调用result().


* 如果您想自己构建它们,这并不难。源代码multiprocessing.pool写得很好,评论也很好,而且没有那么复杂,而且大多数困难的东西与线程无关;的来源concurrent.futures更简单。

于 2013-08-21T01:03:46.360 回答
0

我用了几次这个技术,我觉得有点丑陋的想法:

import threading

def process_something():
    something = list(get_something)

    def worker():
        while something:
            obj = something.pop()
            # do something with obj

   threads = [Thread(target=worker) for i in range(4)]
   [t.start() for t in threads]
   [t.join() for t in threads]
于 2016-10-19T17:59:04.980 回答