I ended up writing this. Not properly tested (bugfixes welcome), but it seems to work:
class LineReader:
def __init__(self, stream):
self.stream = stream
self._line_generator = self.generate_lines()
@staticmethod
def generate_lines():
buf = bytes()
while True:
newline_idx = buf.find(b'\n')
if newline_idx < 0:
# no b'\n' found in buf
more_data = yield
else:
# b'\n' found in buf so return the line and move up buf
line = buf[:newline_idx+1]
buf = buf[newline_idx+1:]
more_data = yield line
if more_data is not None:
buf += bytes(more_data)
async def readline(self):
line = next(self._line_generator)
while line is None:
more_data = await self.stream.receive_some(1024)
if not more_data:
return b'' # this is the EOF indication expected by my caller
line = self._line_generator.send(more_data)
return line
Then I can wrap the ReceiveStream
with a LineReader
and use its readline
method. Adding __aiter__()
and __anext()__
would then be trivial, but I don't need it in my case (I'm porting something to trio that doesn't use async for
anyway).
The other flaw with this is that it assumes UTF-8 or a similar encoding where b'\n'
newlines exist in the encoded bytes object unmodified.
It'd be nice to rely on library functions to handle this though; other answers appreciated.