10

asyncio 有StreamReader.readline(),允许类似:

while True:
    line = await reader.readline()
    ...

(我async for在 asyncio 中看不到可用,但这将是明显的演变)

如何用三重奏达到同等效果?

我在 trio 0.9 中没有直接看到对此的任何高级支持。我所看到的只是ReceiveStream.receive_some()返回任意大小的二进制块;对我来说,将其解码并将其转换为逐行的东西似乎并非易事。有我可以使用的标准库函数或代码片段吗?我发现 io stdlib 模块看起来很有希望,但我看不到任何提供“提要”方法的方法。

4

3 回答 3

11

你是对的,目前 Trio 中没有对此的高级支持。应该有一些东西,虽然我不是 100% 确定它应该是什么样子。我打开了一个问题来讨论它。

同时,您的实现看起来是合理的。

如果你想让它更加健壮,你可以 (1) 使用 abytearray而不是bytes你的缓冲区,以添加和删除分期 O(n) 而不是 O(n^2),(2) 限制最大行长,所以邪恶的同伴不能强迫你浪费无限的内存来缓冲无限长的行,(3)find在最后一个停止的地方恢复每次调用而不是每次从头开始重新开始,再次避免 O (n^2) 行为。如果您只与合理的行长和行为良好的同伴打交道,这一切都不是超级重要的,但这也没有什么坏处。

这是您的代码的调整版本,它试图合并这三个想法:

class LineReader:
    def __init__(self, stream, max_line_length=16384):
        self.stream = stream
        self._line_generator = self.generate_lines(max_line_length)

    @staticmethod
    def generate_lines(max_line_length):
        buf = bytearray()
        find_start = 0
        while True:
            newline_idx = buf.find(b'\n', find_start)
            if newline_idx < 0:
                # no b'\n' found in buf
                if len(buf) > max_line_length:
                    raise ValueError("line too long")
                # next time, start the search where this one left off
                find_start = len(buf)
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                # Update the buffer in place, to take advantage of bytearray's
                # optimized delete-from-beginning feature.
                del buf[:newline_idx+1]
                # next time, start the search from the beginning
                find_start = 0
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

(随意在您喜欢的任何许可下使用。)

于 2018-12-02T02:34:02.203 回答
1

I ended up writing this. Not properly tested (bugfixes welcome), but it seems to work:

class LineReader:
    def __init__(self, stream):
        self.stream = stream
        self._line_generator = self.generate_lines()

    @staticmethod
    def generate_lines():
        buf = bytes()
        while True:
            newline_idx = buf.find(b'\n')
            if newline_idx < 0:
                # no b'\n' found in buf
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                buf = buf[newline_idx+1:]
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

Then I can wrap the ReceiveStream with a LineReader and use its readline method. Adding __aiter__() and __anext()__ would then be trivial, but I don't need it in my case (I'm porting something to trio that doesn't use async for anyway).

The other flaw with this is that it assumes UTF-8 or a similar encoding where b'\n' newlines exist in the encoded bytes object unmodified.

It'd be nice to rely on library functions to handle this though; other answers appreciated.

于 2018-12-02T00:39:56.267 回答
0

我正在使用的一种非常天真的方法:

async def readline(stdout: trio.abc.ReceiveStream):
    data = b""
    while True:
        _data = await stdout.receive_some()
        if _data == b"":
            break
        data += _data
        if data.endswith(b"\n"):
            break
    return data

# use it like this:
async def fn():
    async with await trio.open_process(..., stdout=subprocess.PIPE) as process:
        while True:
            # instead of:
            #   data = process.stdout.receive_some()
            # use this:
            line = await readline(process.stdout)
            if line == b"":
                break
于 2020-07-06T12:48:51.637 回答