1

我正在寻找一种方法来有效地从磁盘中获取一大块值,然后对该块执行计算/计算。我的想法是一个 for 循环,它首先运行磁盘获取任务,然后对获取的数据运行计算。我想让我的程序在运行计算时获取下一批,这样我就不必在每次计算完成时等待另一个数据获取。我预计计算将比从磁盘获取数据花费更长的时间,并且由于单个计算任务已经将 cpu 使用率固定在接近 100%,因此可能无法真正并行完成。

我在下面使用 trio 的 python 中提供了一些代码(但也可以与 asyncio 一起使用以达到相同的效果)来说明我使用异步编程执行此操作的最佳尝试:

import trio
import numpy as np
from datetime import datetime as dt
import time

testiters=10
dim = 6000


def generateMat(arrlen):
    for _ in range(30):
        retval= np.random.rand(arrlen, arrlen)
    # print("matrix generated")
    return retval

def computeOpertion(matrix):
    return np.linalg.inv(matrix)


def runSync():
    for _ in range(testiters):
        mat=generateMat(dim)
        result=computeOpertion(mat)
    return result

async def matGenerator_Async(count):
    for _ in range(count):
        yield generateMat(dim)

async def computeOpertion_Async(matrix):
    return computeOpertion(matrix)

async def runAsync():
    async with trio.open_nursery() as nursery:
        async for value in matGenerator_Async(testiters): 
            nursery.start_soon(computeOpertion_Async,value)
            #await computeOpertion_Async(value)

            

print("Sync:")
start=dt.now()
runSync()
print(dt.now()-start)

print("Async:")
start=dt.now()
trio.run(runAsync)
print(dt.now()-start)

此代码将通过生成 30 个随机矩阵来模拟从磁盘获取数据,这会使用少量 cpu。然后它将对生成的矩阵执行矩阵求逆,该矩阵使用 100% cpu(在 numpy 中使用 openblas/mkl 配置)。我通过对同步和异步操作计时来比较运行任务所花费的时间。

据我所知,这两个作业完成的时间完全相同,这意味着异步操作并没有加快执行速度。观察每个计算的行为,顺序操作按顺序运行提取和计算,异步操作首先运行所有提取,然后是所有计算。

有没有办法使用异步获取和计算?也许有期货或类似收集()的东西?Asyncio 具有这些功能,而 trio 将它们放在单独的包trio_future中。我也对通过其他方法(线程和多处理)的解决方案持开放态度。

我相信可能存在一种多处理解决方案,可以使磁盘读取操作在单独的进程中运行。但是,进程间通信和阻塞就变得很麻烦,因为由于内存限制,我需要某种信号量来控制一次可以生成多少块,并且多处理往往非常繁重和缓慢。

编辑

谢谢VPfB的回答。我无法在操作中休眠(0),但我认为即使我这样做了,它也必然会阻止计算以支持执行磁盘操作。我认为这可能是 python 线程和 asyncio 的一个硬限制,它一次只能执行 1 个线程。如果除了等待一些外部资源从您的 CPU 响应之外,两个不同的进程都需要任何东西,那么同时运行两个不同的进程是不可能的。

也许有一种方法可以使用多处理池的执行器。我在下面添加了以下代码:

import asyncio
import concurrent.futures

async def asynciorunAsync():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:    
         async for value in matGenerator_Async(testiters):              
            result = await loop.run_in_executor(pool, computeOpertion,value)


print("Async with PoolExecutor:")
start=dt.now()
asyncio.run(asynciorunAsync())
print(dt.now()-start)

尽管计时,它仍然需要与同步示例相同的时间。我认为我将不得不采用一个更复杂的解决方案,因为似乎 async 和 await 对于正确执行此类任务切换的工具来说太粗糙了。

4

3 回答 3

1

我不使用三重奏,我的回答是基于异步的。

在这些情况下,我看到的提高异步性能的唯一方法是将计算分成更小的部分并插入await sleep(0)它们之间。这将允许数据获取任务运行。

Asyncio 使用协作调度。一个同步的 CPU 绑定例程不合作,它在运行时会阻塞其他所有内容。

sleep()总是挂起当前任务,允许其他任务运行。

将延迟设置为 0 可提供优化路径以允许其他任务运行。这可以由长时间运行的函数使用,以避免在函数调用的整个持续时间内阻塞事件循环。

(引自:asyncio.sleep


如果这不可能,请尝试在executor中运行计算。这为原本纯 asyncio 代码添加了一些多线程功能。

于 2021-11-10T07:22:59.003 回答
0

异步 I/O 的重点是在网络 I/O 很多但实际计算(或磁盘 I/O)很少的情况下,可以轻松编写程序。这适用于任何异步库(Trio 或 asyncio)甚至不同的语言(例如 C++ 中的 ASIO)。所以你的程序非常不适合异步 I/O!您将需要使用多个线程(或进程)。虽然,公平地说,包括 Trio 在内的异步 I/O 可用于协调线程上的工作,并且在您的情况下可能会很好用。

正如 VPfB 的回答所说,如果您使用的是 asyncio,那么您可以使用执行程序,特别是ThreadPoolExecutor传递给loop.run_in_executor(). 对于 Trio,等效项是trio.to_thread.run_sync()(另请参阅Trio 文档中的Threads(如果必须的话)),它更易于使用。在这两种情况下,您都可以await得到结果,因此该函数在单独的线程中运行,而主 Trio 线程可以继续运行您的异步代码。您的代码最终看起来像这样:

async def matGenerator_Async(count):
    for _ in range(count):
        yield await trio.to_thread.run_sync(generateMat, dim)

async def my_trio_main()
    async with trio.open_nursery() as nursery:
        async for matrix in matGenerator_Async(testiters):
             nursery.start_soon(trio.to_thread.run_sync, computeOperation, matrix)

trio.run(my_trio_main)

计算函数 (generateMatcomputeOperation) 不需要是异步的。事实上,如果它们是有问题的,因为您不能再在单独的线程中运行它们。通常,仅在async需要await某些东西或使用async withor时才创建功能async for

您可以从上面的示例中看到如何将数据传递给在另一个线程中运行的函数:只需将它们作为参数传递给trio.to_thread.run_sync(),它们将作为参数传递给函数。从中获取结果generateMat()也很简单——在另一个线程中调用的函数的返回值是从await trio.to_thread.run_sync(). 获取结果computeOperation()比较棘手,因为它是在 Nursery 中调用的,所以它的返回值被丢弃了。您需要向它传递一个可变参数(如 a dict)并将结果存储在其中。但是要注意线程安全;最简单的方法是将一个新对象传递给每个协程,并且仅在托儿所完成后检查它们。

一些你可能会忽略的最后脚注:

  • 需要明确的是,yield await在上面的代码中并不是某种特殊的语法。它只是await foo(),它在完成后返回一个值foo(),然后yield是该值。
  • to_thread.run_sync()您可以通过传递一个CapacityLimiter对象来更改 Trio 用于调用的线程数,或者通过查找默认值并设置其计数。看起来默认值当前是 40,所以你可能想把它调低一点,但这可能不是太重要。
  • 有一个普遍的说法是 Python 不支持线程,或者至少不能同时在多个线程中进行计算,因为它有一个全局锁(全局解释器锁,或 GIL)。这意味着您需要使用多个进程而不是线程,以便您的程序真正并行计算事物。确实在 Python 中有一个 GIL,但是只要您使用 numpy 之类的东西进行计算,那么它就不会阻止多线程有效地工作。
  • Trio 实际上对异步文件 I/O有很好的支持。但我认为这对你的情况没有帮助。
于 2021-12-29T13:36:58.867 回答
0

为了补充我的另一个答案(它像你问的那样使用 Trio),这里是如何使用它只使用没有任何异步库的线程。使用Future对象ThreadPoolExecutor.

futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for matrix in matGenerator(testiters):
        futures.append(executor.submit(computeOperation, matrix))
results = [f.result() for f in futures]

该代码实际上与异步代码非常相似,但如果有的话,它更简单。如果你不需要做网络 I/O,你最好用这种方法。

于 2021-12-29T21:21:13.117 回答