185

我正在学习如何使用 Python 中的threadingmultiprocessing模块来并行运行某些操作并加速我的代码。

我发现这很难(可能是因为我没有任何关于它的理论背景)来理解threading.Thread()对象和对象之间的区别multiprocessing.Process()

此外,我并不完全清楚如何实例化一个作业队列并让其中只有 4 个(例如)并行运行,而另一个则等待资源释放后再执行。

我发现文档中的示例很清楚,但不是很详尽;一旦我尝试使事情复杂一点,我就会收到很多奇怪的错误(比如不能腌制的方法等等)。

那么,我应该什么时候使用threadingandmultiprocessing模块呢?

您能否将我链接到一些解释这两个模块背后的概念以及如何正确使用它们来完成复杂任务的资源?

4

6 回答 6

316

Giulio Franco 所说的对于一般的多线程与多处理来说是正确的。

但是,Python *有一个额外的问题:有一个全局解释器锁可以防止同一进程中的两个线程同时运行 Python 代码。这意味着如果您有 8 个内核,并且将代码更改为使用 8 个线程,它将无法使用 800% 的 CPU 并且运行速度提高 8 倍;它将使用相同的 100% CPU 并以相同的速度运行。(实际上,它的运行速度会慢一些,因为线程会产生额外的开销,即使您没有任何共享数据,但现在请忽略它。)

这也有例外。如果您的代码的繁重计算实际上并没有发生在 Python 中,而是在一些具有自定义 C 代码的库中进行适当的 GIL 处理,例如一个 numpy 应用程序,您将从线程中获得预期的性能优势。如果繁重的计算是由您运行并等待的某个子进程完成的,情况也是如此。

更重要的是,在某些情况下这并不重要。例如,网络服务器将大部分时间用于从网络读取数据包,而 GUI 应用程序将大部分时间用于等待用户事件。在网络服务器或 GUI 应用程序中使用线程的一个原因是允许您执行长时间运行的“后台任务”,而无需停止主线程继续为网络数据包或 GUI 事件提供服务。这适用于 Python 线程。(从技术上讲,这意味着 Python 线程可以为您提供并发性,即使它们没有为您提供核心并行性。)

但是如果你用纯 Python 编写一个 CPU 密集型程序,使用更多线程通常没有帮助。

使用单独的进程对 GIL 没有这样的问题,因为每个进程都有自己单独的 GIL。当然,线程和进程之间的权衡仍然与任何其他语言相同——在进程之间共享数据比在线程之间共享数据更加困难和昂贵,运行大量进程或创建和销毁可能成本高昂但 GIL 非常重视进程的平衡,而这对于 C 或 Java 来说并非如此。因此,您会发现自己在 Python 中使用多处理的频率要高于在 C 或 Java 中的使用频率。


同时,Python 的“包含电池”理念带来了一些好消息:编写可以在线程和进程之间来回切换的代码非常容易,只需一行更改。

如果您根据独立的“作业”设计代码,除了输入和输出外,不与其他作业(或主程序)共享任何内容,您可以使用该concurrent.futures库围绕线程池编写代码,如下所示:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(job, argument)
    executor.map(some_function, collection_of_independent_things)
    # ...

您甚至可以获取这些作业的结果并将它们传递给其他作业,按执行顺序或完成顺序等待事物等;阅读有关Future对象的部分了解详细信息。

现在,如果你的程序一直在使用 100% 的 CPU,并且添加更多线程只会让它变慢,那么你就会遇到 GIL 问题,所以你需要切换到进程。您所要做的就是更改第一行:

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

唯一真正需要注意的是,您的作业的参数和返回值必须是可腌制的(并且不会花费太多时间或内存来腌制)才能跨进程使用。通常这不是问题,但有时会出现问题。


但是,如果您的工作不能自给自足怎么办?如果您可以根据将消息从一个传递到另一个的作业来设计您的代码,那么它仍然非常容易。您可能必须使用threading.Threadmultiprocessing.Process不依赖于池。而且您必须明确地创建queue.Queuemultiprocessing.Queue对象。(还有很多其他的选择——管道、套接字、带有羊群的文件……但关键是,如果 Executor 的自动魔法不足,你必须手动做一些事情。)

但是,如果您甚至不能依赖消息传递怎么办?如果你需要两个工作来改变相同的结构,并看到彼此的变化怎么办?在这种情况下,您将需要进行手动同步(锁、信号量、条件等),并且如果您想使用进程,则需要显式共享内存对象来引导。这是多线程(或多处理)变得困难的时候。如果可以避免,那就太好了;如果你不能,你需要阅读的内容比有人能写进 SO 答案的内容还要多。


从评论中,您想知道 Python 中的线程和进程之间有什么不同。真的,如果您阅读 Giulio Franco 的答案和我的答案以及我们所有的链接,那应该涵盖所有内容……但是摘要肯定会很有用,所以这里是:

  1. 线程默认共享数据;进程没有。
  2. 作为 (1) 的结果,在进程之间发送数据通常需要酸洗和解酸。**
  3. 作为 (1) 的另一个结果,在进程之间直接共享数据通常需要将其放入低级格式,如值、数组和ctypes类型。
  4. 流程不受 GIL 约束。
  5. 在某些平台(主要是 Windows)上,创建和销毁进程的成本要高得多。
  6. 进程有一些额外的限制,其中一些在不同的平台上是不同的。有关详细信息,请参阅编程指南
  7. threading模块没有该模块的某些功能multiprocessing。(您可以使用multiprocessing.dummy在线程之上获取大部分缺少的 API,或者您可以使用更高级别的模块concurrent.futures,而不用担心它。)

* 实际上,存在此问题的语言不是 Python,而是该语言的“标准”实现 CPython。其他一些实现没有 GIL,例如 Jython。

** 如果您使用fork start 方法进行多处理(在大多数非 Windows 平台上都可以),每个子进程都会在子进程启动时获取父进程拥有的任何资源,这可能是向子进程传递数据的另一种方式。

于 2013-08-07T22:28:12.500 回答
43

多个线程可以存在于单个进程中。属于同一进程的线程共享相同的内存区域(可以读取和写入相同的变量,并且可以相互干扰)。相反,不同的进程生活在不同的内存区域,每个进程都有自己的变量。为了进行通信,进程必须使用其他通道(文件、管道或套接字)。

如果你想并行计算,你可能需要多线程,因为你可能希望线程在同一个内存上协作。

说到性能,线程比进程更快地创建和管理(因为操作系统不需要分配一个全新的虚拟内存区域),并且线程间通信通常比进程间通信更快。但是线程更难编程。线程可以相互干扰,并且可以写入彼此的内存,但这种情况发生的方式并不总是很明显(由于多种因素,主要是指令重新排序和内存缓存),因此您将需要同步原语来控制访问到你的变量。

于 2013-08-07T21:53:41.113 回答
8

Python 文档引用

我在以下位置突出显示了有关进程与线程和 GIL 的关键 Python 文档引用:什么是 CPython 中的全局解释器锁 (GIL)?

进程与线程实验

为了更具体地显示差异,我做了一些基准测试。

在基准测试中,我为8 个超线程CPU 上的不同数量的线程计时了 CPU 和 IO 绑定工作。每个线程提供的工作总是相同的,因此更多的线程意味着提供的总工作更多。

结果是:

在此处输入图像描述

绘制数据

结论:

  • 对于 CPU 密集型工作,多处理总是更快,大概是由于 GIL

  • 用于 IO 绑定的工作。两者的速度完全相同

  • 由于我在 8 个超线程机器上,线程只能扩展到大约 4 倍而不是预期的 8 倍。

    将其与 C POSIX CPU-bound 工作进行对比,该工作达到预期的 8 倍加速:“真实”、“用户”和“系统”在 time(1) 的输出中是什么意思?

    TODO:我不知道这是什么原因,一定还有其他 Python 的低效率在起作用。

测试代码:

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import sys

def cpu_func(result, niters):
    '''
    A useless CPU bound function.
    '''
    for i in range(niters):
        result = (result * result * i + 2 * result * i * i + 3) % 10000000
    return result

class CpuThread(threading.Thread):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class CpuProcess(multiprocessing.Process):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class IoThread(threading.Thread):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

class IoProcess(multiprocessing.Process):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

if __name__ == '__main__':
    cpu_n_iters = int(sys.argv[1])
    sleep = 1
    cpu_count = multiprocessing.cpu_count()
    input_params = [
        (CpuThread, cpu_n_iters),
        (CpuProcess, cpu_n_iters),
        (IoThread, sleep),
        (IoProcess, sleep),
    ]
    header = ['nthreads']
    for thread_class, _ in input_params:
        header.append(thread_class.__name__)
    print(' '.join(header))
    for nthreads in range(1, 2 * cpu_count):
        results = [nthreads]
        for thread_class, work_size in input_params:
            start_time = time.time()
            threads = []
            for i in range(nthreads):
                thread = thread_class(work_size)
                threads.append(thread)
                thread.start()
            for i, thread in enumerate(threads):
                thread.join()
            results.append(time.time() - start_time)
        print(' '.join('{:.6e}'.format(result) for result in results))

GitHub 上游 + 在同一目录上绘制代码

在 Ubuntu 18.10、Python 3.6.7 上测试,在带有 CPU 的 Lenovo ThinkPad P51 笔记本电脑上:Intel Core i7-7820HQ CPU(4 核 / 8 线程),RAM:2x Samsung M471A2K43BB1-CRC (2x 16GiB),SSD:Samsung MZVLB512HAJQ- 000L7(3,000 MB/秒)。

可视化在给定时间哪些线程正在运行

这篇文章https://rohanvarma.me/GIL/告诉我,只要使用 和 的参数调度线程,就可以运行target=回调threading.Threadmultiprocessing.Process

这使我们能够准确地查看每次运行的线程。完成后,我们会看到类似的内容(我制作了这个特定的图表):

            +--------------------------------------+
            + Active threads / processes           +
+-----------+--------------------------------------+
|Thread   1 |********     ************             |
|         2 |        *****            *************|
+-----------+--------------------------------------+
|Process  1 |***  ************** ******  ****      |
|         2 |** **** ****** ** ********* **********|
+-----------+--------------------------------------+
            + Time -->                             +
            +--------------------------------------+

这将表明:

  • 线程完全由 GIL 序列化
  • 进程可以并行运行
于 2020-02-28T12:04:43.223 回答
5

我相信此链接以优雅的方式回答了您的问题。

简而言之,如果您的一个子问题必须等待另一个子问题完成,那么多线程是好的(例如,在 I/O 繁重的操作中);相比之下,如果您的子问题确实可能同时发生,则建议使用多处理。但是,您创建的进程数不会超过内核数。

于 2015-05-13T04:01:57.737 回答
2

这是 python 2.6.x 的一些性能数据,它质疑线程在 IO 绑定场景中比多处理性能更高的概念。这些结果来自 40 个处理器的 IBM System x3650 M4 BD。

IO-Bound Processing : Process Pool 比 Thread Pool 执行得更好

>>> do_work(50, 300, 'thread','fileio')
do_work function took 455.752 ms

>>> do_work(50, 300, 'process','fileio')
do_work function took 319.279 ms

CPU-Bound 处理:进程池比线程池执行得更好

>>> do_work(50, 2000, 'thread','square')
do_work function took 338.309 ms

>>> do_work(50, 2000, 'process','square')
do_work function took 287.488 ms

这些不是严格的测试,但它们告诉我,与线程相比,多处理并非完全没有性能。

用于上述测试的交互式 python 控制台中的代码

from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import time
import sys
import os
from glob import glob

text_for_test = str(range(1,100000))

def fileio(i):
 try :
  os.remove(glob('./test/test-*'))
 except : 
  pass
 f=open('./test/test-'+str(i),'a')
 f.write(text_for_test)
 f.close()
 f=open('./test/test-'+str(i),'r')
 text = f.read()
 f.close()


def square(i):
 return i*i

def timing(f):
 def wrap(*args):
  time1 = time.time()
  ret = f(*args)
  time2 = time.time()
  print '%s function took %0.3f ms' % (f.func_name, (time2-time1)*1000.0)
  return ret
 return wrap

result = None

@timing
def do_work(process_count, items, process_type, method) :
 pool = None
 if process_type == 'process' :
  pool = Pool(processes=process_count)
 else :
  pool = ThreadPool(processes=process_count)
 if method == 'square' : 
  multiple_results = [pool.apply_async(square,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]
 else :
  multiple_results = [pool.apply_async(fileio,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]


do_work(50, 300, 'thread','fileio')
do_work(50, 300, 'process','fileio')

do_work(50, 2000, 'thread','square')
do_work(50, 2000, 'process','square')
于 2016-11-06T05:22:10.333 回答
-5

好吧,大部分问题都由 Giulio Franco 回答。我将进一步详细说明消费者-生产者问题,我想这将使您在使用多线程应用程序的解决方案中走上正轨。

fill_count = Semaphore(0) # items produced
empty_count = Semaphore(BUFFER_SIZE) # remaining space
buffer = Buffer()

def producer(fill_count, empty_count, buffer):
    while True:
        item = produceItem()
        empty_count.down();
        buffer.push(item)
        fill_count.up()

def consumer(fill_count, empty_count, buffer):
    while True:
        fill_count.down()
        item = buffer.pop()
        empty_count.up()
        consume_item(item)

您可以从以下位置阅读有关同步原语的更多信息:

 http://linux.die.net/man/7/sem_overview
 http://docs.python.org/2/library/threading.html

伪代码如上。我想你应该搜索生产者消费者问题以获得更多参考。

于 2013-08-07T22:04:19.920 回答