3

我有一个 Python 函数(用 C++ 实现),它从文件描述符(包装在FILE*C++ 端)读取,我需要从asyncio.StreamReader. 具体来说,阅读器是 HTTP 响应的内容:aiohttp.ClientResponse.content

我想我可能会打开一个管道,将读取端传递给 C++ 函数并将写入端连接asyncio的事件循环。但是,如何通过适当的流控制和尽可能少的复制将数据从流读取器移动到管道?

缺少部分的代码骨架如下:

# obtain the StreamReader from aiohttp
content = aiohttp_client_response.content
# create a pipe
(pipe_read_fd, pipe_write_fd) = os.pipe()

# now I need a suitable protocol to manage the pipe transport
protocol = ?
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd)

# the protocol should start reading from `content` and writing into the pipe
return pipe_read_fd
4

2 回答 2

2

subprocess_attach_write_pipe异步示例:

rfd, wfd = os.pipe()
pipe = open(wfd, 'wb', 0)
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe)
transport.write(b'data')

编辑 - 对于写入流控制,请参阅以下方法:

这是一个可能的FlowControl实现,受StreamWriter.drain启发:

class FlowControl(asyncio.streams.FlowControlMixin):
    async def drain(self):
        await self._drain_helper()

用法:

transport, protocol = await loop.connect_write_pipe(FlowControl, pipe)
transport.write(b'data')
await protocol.drain()
于 2016-11-20T09:28:20.650 回答
0

我通过使用 aThreadPoolExecutor和阻止调用来解决这个问题os.write

(read_fd, write_fd) = os.pipe()
task_1 = loop.create_task(pump_bytes_into_fd(write_fd))
task_2 = loop.run_in_executor(executor_1, parse_bytes_from_fd(read_fd))

async def pump_bytes_into_fd(write_fd):
    while True:
        chunk = await stream.read(CHUNK_SIZE)
        if chunk is None: break
        # process the chunk
        await loop.run_in_executor(executor_2, os.write, write_fd, chunk)

至关重要的是,使用两个不同的执行程序来阻止读取和写入以避免死锁。

于 2016-09-05T13:55:57.873 回答