我正在寻找一种方法来有效地从磁盘中获取一大块值,然后对该块执行计算/计算。我的想法是一个 for 循环,它首先运行磁盘获取任务,然后对获取的数据运行计算。我想让我的程序在运行计算时获取下一批,这样我就不必在每次计算完成时等待另一个数据获取。我预计计算将比从磁盘获取数据花费更长的时间,并且由于单个计算任务已经将 cpu 使用率固定在接近 100%,因此可能无法真正并行完成。
我在下面使用 trio 的 python 中提供了一些代码(但也可以与 asyncio 一起使用以达到相同的效果)来说明我使用异步编程执行此操作的最佳尝试:
import trio
import numpy as np
from datetime import datetime as dt
import time
testiters=10
dim = 6000
def generateMat(arrlen):
for _ in range(30):
retval= np.random.rand(arrlen, arrlen)
# print("matrix generated")
return retval
def computeOpertion(matrix):
return np.linalg.inv(matrix)
def runSync():
for _ in range(testiters):
mat=generateMat(dim)
result=computeOpertion(mat)
return result
async def matGenerator_Async(count):
for _ in range(count):
yield generateMat(dim)
async def computeOpertion_Async(matrix):
return computeOpertion(matrix)
async def runAsync():
async with trio.open_nursery() as nursery:
async for value in matGenerator_Async(testiters):
nursery.start_soon(computeOpertion_Async,value)
#await computeOpertion_Async(value)
print("Sync:")
start=dt.now()
runSync()
print(dt.now()-start)
print("Async:")
start=dt.now()
trio.run(runAsync)
print(dt.now()-start)
此代码将通过生成 30 个随机矩阵来模拟从磁盘获取数据,这会使用少量 cpu。然后它将对生成的矩阵执行矩阵求逆,该矩阵使用 100% cpu(在 numpy 中使用 openblas/mkl 配置)。我通过对同步和异步操作计时来比较运行任务所花费的时间。
据我所知,这两个作业完成的时间完全相同,这意味着异步操作并没有加快执行速度。观察每个计算的行为,顺序操作按顺序运行提取和计算,异步操作首先运行所有提取,然后是所有计算。
有没有办法使用异步获取和计算?也许有期货或类似收集()的东西?Asyncio 具有这些功能,而 trio 将它们放在单独的包trio_future中。我也对通过其他方法(线程和多处理)的解决方案持开放态度。
我相信可能存在一种多处理解决方案,可以使磁盘读取操作在单独的进程中运行。但是,进程间通信和阻塞就变得很麻烦,因为由于内存限制,我需要某种信号量来控制一次可以生成多少块,并且多处理往往非常繁重和缓慢。
编辑
谢谢VPfB的回答。我无法在操作中休眠(0),但我认为即使我这样做了,它也必然会阻止计算以支持执行磁盘操作。我认为这可能是 python 线程和 asyncio 的一个硬限制,它一次只能执行 1 个线程。如果除了等待一些外部资源从您的 CPU 响应之外,两个不同的进程都需要任何东西,那么同时运行两个不同的进程是不可能的。
也许有一种方法可以使用多处理池的执行器。我在下面添加了以下代码:
import asyncio
import concurrent.futures
async def asynciorunAsync():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
async for value in matGenerator_Async(testiters):
result = await loop.run_in_executor(pool, computeOpertion,value)
print("Async with PoolExecutor:")
start=dt.now()
asyncio.run(asynciorunAsync())
print(dt.now()-start)
尽管计时,它仍然需要与同步示例相同的时间。我认为我将不得不采用一个更复杂的解决方案,因为似乎 async 和 await 对于正确执行此类任务切换的工具来说太粗糙了。