我发现在 Python 3.4 中有几个不同的多处理/线程库:multiprocessing vs threading vs asyncio。
但我不知道使用哪一个或者是“推荐的”。他们做同样的事情,还是不同?如果是这样,哪一个用于什么?我想在我的计算机上编写一个使用多核的程序。但我不知道我应该学习哪个库。
我发现在 Python 3.4 中有几个不同的多处理/线程库:multiprocessing vs threading vs asyncio。
但我不知道使用哪一个或者是“推荐的”。他们做同样的事情,还是不同?如果是这样,哪一个用于什么?我想在我的计算机上编写一个使用多核的程序。但我不知道我应该学习哪个库。
我们已经了解了最流行的并发形式。但问题仍然存在——什么时候应该选择哪一个?这实际上取决于用例。根据我的经验(和阅读),我倾向于遵循这个伪代码:
if io_bound:
if io_very_slow:
print("Use Asyncio")
else:
print("Use Threads")
else:
print("Multi Processing")
- CPU Bound => 多处理
- I/O Bound, Fast I/O, Limited Number of Connections => 多线程
- I/O Bound,慢 I/O,许多连接 => Asyncio
[注意]:
asyncio
事件循环(uvloop使asyncio
2-4x 快)。[更新(2019)]:
它们用于(略微)不同的目的和/或要求。CPython(一种典型的主流 Python 实现)仍然具有全局解释器锁,因此多线程应用程序(当今实现并行处理的标准方法)不是最理想的。这就是为什么multiprocessing
可能优先于threading
. 但并不是每个问题都可以有效地分解为[几乎独立的]部分,因此可能需要繁重的进程间通信。这就是为什么在一般multiprocessing
情况下可能不受欢迎的原因。threading
asyncio
(这种技术不仅在 Python 中可用,其他语言和/或框架也有,例如Boost.ASIO)是一种有效处理来自许多同时源的大量 I/O 操作而无需并行代码执行的方法. 因此,它只是针对特定任务的解决方案(确实是一个很好的解决方案!),而不是一般的并行处理。
在多处理中,您可以利用多个 CPU 来分配您的计算。由于每个 CPU 并行运行,因此您可以有效地同时运行多个任务。您可能希望对CPU 密集型任务使用多处理。一个例子是试图计算一个巨大列表的所有元素的总和。如果您的机器有 8 个内核,您可以将列表“切割”成 8 个较小的列表,并在单独的内核上分别计算每个列表的总和,然后将这些数字相加。通过这样做,您将获得约 8 倍的加速。
在(多)线程您不需要多个 CPU。想象一个向 Web 发送大量 HTTP 请求的程序。如果您使用单线程程序,它将在每个请求处停止执行(块),等待响应,然后在收到响应后继续。这里的问题是,在等待某个外部服务器完成工作时,您的 CPU 并没有真正在工作;同时它实际上可以做一些有用的工作!解决方法是使用线程 - 您可以创建许多线程,每个线程负责从 Web 请求一些内容。线程的好处在于,即使它们在一个 CPU 上运行,CPU 也会时不时地“冻结”一个线程的执行并跳转到执行另一个线程(这称为上下文切换,它会在不确定的情况下不断发生间隔)。- 使用线程。
asyncio本质上是线程,不是 CPU,而是您作为程序员(或实际上是您的应用程序)决定上下文切换发生的地点和时间。在 Python 中,您使用await
关键字来暂停协程的执行(使用async
关键字定义)。
这是基本思想:
是IO绑定的吗?------------> 使用
asyncio
它是CPU重吗?---------> 使用
multiprocessing
别的 ?----------------------> 使用
threading
所以基本上坚持线程,除非你有 IO/CPU 问题。
许多答案都建议如何仅选择 1 个选项,但为什么不能全部使用 3 个?在这个答案中,我解释了如何使用它asyncio
来管理所有 3 种并发形式的组合,并在需要时轻松地在它们之间进行交换。
许多第一次接触 Python 并发的开发人员最终会使用processing.Process
and threading.Thread
. concurrent.futures
但是,这些是由模块提供的高级 API 合并在一起的低级 API 。此外,生成进程和线程会产生开销,例如需要更多内存,这是困扰我在下面展示的示例之一的问题。在一定程度上,concurrent.futures
为您管理这一点,这样您就不能轻易地通过仅产生几个进程然后在每次完成时重新使用这些进程来生成一千个进程并让您的计算机崩溃。
这些高级 API 通过 提供concurrent.futures.Executor
,然后由concurrent.futures.ProcessPoolExecutor
和实现concurrent.futures.ThreadPoolExecutor
。在大多数情况下,您应该在multiprocessing.Process
and上使用它们threading.Thread
,因为将来使用时更容易从一个更改为另一个,concurrent.futures
并且您不必了解每个的详细差异。
由于这些共享一个统一的接口,您还会发现使用multiprocessing
或threading
经常使用concurrent.futures
. asyncio
对此也不例外,并提供了一种通过以下代码使用它的方法:
import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar
T = TypeVar("T")
async def run_in_executor(
executor: Optional[Executor],
func: Callable[..., T],
/,
*args: Any,
**kwargs: Any,
) -> T:
"""
Run `func(*args, **kwargs)` asynchronously, using an executor.
If the executor is None, use the default ThreadPoolExecutor.
"""
return await asyncio.get_running_loop().run_in_executor(
executor,
partial(func, *args, **kwargs),
)
# Example usage for running `print` in a thread.
async def main():
await run_in_executor(None, print, "O" * 100_000)
asyncio.run(main())
事实上,使用threading
withasyncio
是如此普遍,以至于在 Python 3.9 中,他们添加asyncio.to_thread(func, *args, **kwargs)
了将其缩短为 default 的ThreadPoolExecutor
.
是的。使用asyncio
,最大的缺点是异步函数与同步函数不同。如果您从一开始就没有考虑到编程,这可能会绊倒很多新用户asyncio
并导致大量返工asyncio
。
另一个缺点是您的代码的用户也将被迫使用asyncio
. 所有这些必要的返工通常会让初次使用的asyncio
用户嘴里有一种非常酸的味道。
是的。类似于 using 如何concurrent.futures
优于threading.Thread
其multiprocessing.Process
统一接口,这种方法可以被认为是从Executor
异步函数到异步函数的进一步抽象。您可以开始使用asyncio
,如果以后您发现其中的一部分需要threading
或multiprocessing
,您可以使用asyncio.to_thread
或run_in_executor
。同样,您稍后可能会发现您尝试使用线程运行的异步版本已经存在,因此您可以轻松地退出使用threading
并切换到asyncio
。
是的……不。最终取决于任务。在某些情况下,它可能无济于事(尽管它可能不会造成伤害),而在其他情况下,它可能会有很大帮助。这个答案的其余部分提供了一些解释,说明为什么使用asyncio
运行 anExecutor
可能是有利的。
asyncio
本质上提供了对并发性的更多控制,但代价是您需要更多地控制并发性。如果您想同时使用 a 运行一些代码以及使用 aThreadPoolExecutor
的一些其他代码ProcessPoolExecutor
,使用同步代码管理它并不容易,但使用asyncio
.
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
async def with_processing():
with ProcessPoolExecutor() as executor:
tasks = [...]
for task in asyncio.as_completed(tasks):
result = await task
...
async def with_threading():
with ThreadPoolExecutor() as executor:
tasks = [...]
for task in asyncio.as_completed(tasks):
result = await task
...
async def main():
await asyncio.gather(with_processing(), with_threading())
asyncio.run(main())
这是如何运作的?本质上asyncio
要求执行者运行他们的功能。然后,在执行程序运行时,asyncio
将运行其他代码。例如,ProcessPoolExecutor
启动一堆进程,然后在等待这些进程完成时,ThreadPoolExecutor
启动一堆线程。asyncio
然后将检查这些执行者并在完成后收集他们的结果。此外,如果您有其他代码正在使用asyncio
,您可以在等待进程和线程完成时运行它们。
您的代码中有很多执行程序并不常见,但是当人们使用线程/进程时,我看到的一个常见问题是他们会将整个代码推入一个线程/进程中,期望它能够工作. 例如,我曾经看到过以下代码(大约):
from concurrent.futures import ThreadPoolExecutor
import requests
def get_data(url):
return requests.get(url).json()["data"]
urls = [...]
with ThreadPoolExecutor() as executor:
for data in executor.map(get_data, urls):
print(data)
这段代码的有趣之处在于它在并发时比没有并发时要慢。为什么?因为结果json
很大,并且有许多线程消耗大量内存是灾难性的。幸运的是,解决方案很简单:
from concurrent.futures import ThreadPoolExecutor
import requests
urls = [...]
with ThreadPoolExecutor() as executor:
for response in executor.map(requests.get, urls):
print(response.json()["data"])
现在一次只将一个json
卸载到内存中,一切都很好。
这里的教训?
您不应该尝试将所有代码都放入线程/进程中,而应该关注代码的哪些部分实际需要并发。
但是,如果get_data
不是像这种情况那样简单的函数呢?如果我们必须在函数中间的某个深处应用执行器怎么办?这就是asyncio
进来的地方:
import asyncio
import requests
async def get_data(url):
# A lot of code.
...
# The specific part that needs threading.
response = await asyncio.to_thread(requests.get, url, some_other_params)
# A lot of code.
...
return data
urls = [...]
async def main():
tasks = [get_data(url) for url in urls]
for task in asyncio.as_completed(tasks):
data = await task
print(data)
asyncio.run(main())
尝试相同的concurrent.futures
方法绝不是漂亮的。您可以使用诸如回调、队列等之类的东西,但它比基本asyncio
代码更难管理。
已经有很多很好的答案了。无法详细说明何时使用每一个。这是两个更有趣的组合。多处理 + 异步: https ://pypi.org/project/aiomultiprocess/ 。
它的设计用例是 highio,但仍使用尽可能多的可用内核。Facebook 使用这个库来编写某种基于 python 的文件服务器。Asyncio 允许 IO 绑定流量,但多处理允许多个事件循环和多个内核上的线程。
来自回购的前代码:
import asyncio
from aiohttp import request
from aiomultiprocess import Pool
async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")
async def main():
urls = ["https://jreese.sh", ...]
async with Pool() as pool:
async for result in pool.map(get, urls):
... # process result
if __name__ == '__main__':
# Python 3.7
asyncio.run(main())
# Python 3.6
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
只是和这里的补充,说 jupyter notebook 不能很好地工作,因为 notebook 已经运行了 asyncio 循环。只是一个小提示,让你不要把头发拉出来。