1

我从希望应用于以下问题的 asyncio 开始:

  • 数据被分割成块。
  • 块是第一次压缩。
  • 然后将压缩的块写入文件中。
  • 所有块都使用一个文件,所以我需要一个一个地处理它们。
with open('my_file', 'w+b') as f:
    for chunk in chunks:
        compress_chunk(ch)
        f.write(ch)

从这个上下文来看,为了更快地运行这个过程,write当前迭代的步骤一开始,compress下一次迭代的步骤是否也会被触发?

我可以这样做吗asyncio,保持类似的for循环结构?如果是的话,你能分享一些关于这个的建议吗?

我猜另一种并行运行的方法是使用相位并将相位与相位ProcessPoolExecutor完全分开。这意味着在不同的执行程序中压缩第一个所有块。compresswrite

只有当所有块都被压缩后,才开始写入步骤。asyncio但是,如果有意义的话,我想用第一种方法来研究第一种方法。

提前感谢您的帮助。最好的

4

1 回答 1

2

您可以使用生产者-消费者模型来做到这一点。只要有一个生产者和一个消费者,你就会有正确的顺序。对于您的用例,这就是您将从中受益的全部。此外,您应该使用该aioFiles库。标准文件 IO 将主要阻塞您的主压缩/生产者线程,您不会看到太多加速。尝试这样的事情:

async def produce(queue, chunks):
    for chunk in chunks:
        compress_chunk(ch)
        await queue.put(i)


async def consume(queue):
    with async with aiofiles.open('my_file', 'w') as f:
        while True:
            compressed_chunk = await Q.get()
            await f.write(b'Hello, World!')
            queue.task_done()


async def main():
    queue = asyncio.Queue()

    producer = asyncio.create_task(producer(queue, chunks))
    consumer = asyncio.create_task(consumer(queue))

    # wait for the producer to finish
    await producer

    # wait for the consumer to finish processing and cancel it
    await queue.join()
    consumer.cancel()
 
asyncio.run(main())

https://github.com/Tinche/aiofiles

使用 asyncio.Queue 进行生产者-消费者流

于 2021-10-06T01:47:17.630 回答