83

假设我有一个非常大的列表,我正在执行如下操作:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

我的问题有两个:

  • 有很多项目
  • api.my_operation 需要永远返回

我想使用多线程一次启动一堆 api.my_operations,这样我就可以一次处理 5 个或 10 个甚至 100 个项目。

如果 my_operation() 返回异常(因为也许我已经处理了该项目) - 没关系。它不会破坏任何东西。循环可以继续到下一项。

注意:这是针对 Python 2.7.3

4

4 回答 4

146

首先,在 Python 中,如果您的代码受 CPU 限制,多线程将无济于事,因为只有一个线程可以持有全局解释器锁,因此一次运行 Python 代码。因此,您需要使用进程,而不是线程。

如果您的操作“需要永远返回”,则情况并非如此,因为它是 IO 绑定的,即在网络或磁盘副本等上等待。我稍后再谈。


接下来,一次处理 5 个或 10 个或 100 个项目的方法是创建一个由 5 个或 10 个或 100 个工作人员组成的池,并将这些项目放入工作人员服务的队列中。幸运的是,stdlibmultiprocessingconcurrent.futures库都为您提供了大部分细节。

前者对于传统编程来说更加强大和灵活;如果您需要编写future-waiting,则后者更简单;对于琐碎的情况,您选择哪个并不重要。(在这种情况下,每个最明显的实现需要 3 行和futures4 行multiprocessing。)

如果您使用的是 2.6-2.7 或 3.0-3.1,futures则不是内置的,但您可以从PyPI ( pip install futures) 安装它。


最后,如果您可以将整个循环迭代转换为函数调用(您可以,例如传递给map),那么并行化事情通常要简单得多,所以让我们先这样做:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

把它们放在一起:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

如果你有很多相对较小的工作,多处理的开销可能会淹没收益。解决这个问题的方法是将工作分批成更大的工作。例如(使用grouper来自itertoolsrecipes,您可以将其复制并粘贴到您的代码中,或者从more-itertoolsPyPI 上的项目中获取):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

最后,如果你的代码是 IO 绑定的怎么办?然后线程与进程一样好,并且开销更少(并且限制更少,但在这种情况下,这些限制通常不会影响您)。有时,“较少的开销”足以意味着您不需要使用线程进行批处理,但您需要处理进程,这是一个不错的胜利。

那么,如何使用线程而不是进程呢?只需更改ProcessPoolExecutorThreadPoolExecutor.

如果您不确定您的代码是受 CPU 限制还是受 IO 限制,请尝试两种方式。


我可以为我的 python 脚本中的多个函数执行此操作吗?例如,如果我想要并行化的代码中的其他地方有另一个 for 循环。是否可以在同一个脚本中执行两个多线程函数?

是的。事实上,有两种不同的方法可以做到这一点。

首先,您可以共享同一个(线程或进程)执行程序并在多个地方使用它,没有问题。任务和未来的全部意义在于它们是独立的。您不在乎它们在哪里运行,只需将它们排队并最终得到答案。

或者,您可以在同一个程序中有两个执行程序,这没有问题。这有性能成本——如果你同时使用两个执行器,你最终会尝试在 8 个内核上运行(例如)16 个繁忙的线程,这意味着会有一些上下文切换。但有时它是值得做的,因为,比如说,两个执行器很少同时忙,它使你的代码更简单。或者,一个执行器正在运行可能需要一段时间才能完成的非常大的任务,而另一个执行器正在运行需要尽快完成的非常小的任务,因为对于部分程序而言,响应能力比吞吐量更重要。

如果您不知道哪个适合您的程序,通常是第一个。

于 2013-02-28T19:27:05.133 回答
45

有 multiprocesing.pool,下面的示例说明了如何使用其中之一:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

现在,如果您确实确定您的进程是@abarnert 提到的 CPU 绑定,请将 ThreadPool 更改为进程池实现(在 ThreadPool 导入下注释)。您可以在此处找到更多详细信息:http: //docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

于 2013-02-28T20:11:38.487 回答
19

您可以使用以下方法将处理拆分为指定数量的线程:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)
于 2013-02-28T19:33:22.653 回答
6
import numpy as np
import threading


def threaded_process(items_chunk):
    """ Your main process which runs in thread for each chunk"""
    for item in items_chunk:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')  

n_threads = 20
# Splitting the items into chunks equal to number of threads
array_chunk = np.array_split(input_image_list, n_threads)
thread_list = []
for thr in range(n_threads):
    thread = threading.Thread(target=threaded_process, args=(array_chunk[thr]),)
    thread_list.append(thread)
    thread_list[thr].start()

for thread in thread_list:
    thread.join()
于 2019-09-06T06:23:59.737 回答