假设我有一些StringIO
(来自cStringIO
)。我想从中读取缓冲区,直到遇到一些字符/字节,比如'Z',所以:
stringio = StringIO('ABCZ123')
buf = read_until(stringio, 'Z') # buf is now 'ABCZ'
# strinio.tell() is now 4, pointing after 'Z'
在 Python 中最快的方法是什么?谢谢
假设我有一些StringIO
(来自cStringIO
)。我想从中读取缓冲区,直到遇到一些字符/字节,比如'Z',所以:
stringio = StringIO('ABCZ123')
buf = read_until(stringio, 'Z') # buf is now 'ABCZ'
# strinio.tell() is now 4, pointing after 'Z'
在 Python 中最快的方法是什么?谢谢
我很失望这个问题在堆栈溢出时只有一个答案,因为这是一个有趣且相关的问题。无论如何,由于只有 ovgolovin 提供解决方案,而且我认为它可能很慢,所以我想了一个更快的解决方案:
def foo(stringio):
datalist = []
while True:
chunk = stringio.read(256)
i = chunk.find('Z')
if i == -1:
datalist.append(chunk)
else:
datalist.append(chunk[:i+1])
break
if len(chunk) < 256:
break
return ''.join(datalist)
这以块的形式读取 io(可能未在第一个块中找到结束字符)。它非常快,因为没有为每个字符调用 Python 函数,相反,它最大限度地使用了 C 编写的 Python 函数。
这比 ovgolovin 的解决方案快大约 60 倍。我跑过去timeit
检查。
i = iter(lambda: stringio.read(1),'Z')
buf = ''.join(i) + 'Z'
iter
在此模式下使用这里: iter(callable, sentinel) -> iterator
。
''.join(...)
是相当有效的。添加'Z'的最后一个操作''.join(i) + 'Z'
不是那么好。但它可以通过添加'Z'
到迭代器来解决:
from itertools import chain, repeat
stringio = StringIO.StringIO('ABCZ123')
i = iter(lambda: stringio.read(1),'Z')
i = chain(i,repeat('Z',1))
buf = ''.join(i)
另一种方法是使用生成器:
def take_until_included(stringio):
while True:
s = stringio.read(1)
yield s
if s=='Z':
return
i = take_until_included(stringio)
buf = ''.join(i)
我做了一些效率测试。所描述技术的性能几乎相同:
#!/usr/bin/env python3
import io
def iterate_stream(stream, delimiter, max_read_size=1024):
""" Reads `delimiter` separated strings or bytes from `stream`. """
empty = '' if isinstance(delimiter, str) else b''
chunks = []
while 1:
d = stream.read(max_read_size)
if not d:
break
while d:
i = d.find(delimiter)
if i < 0:
chunks.append(d)
break
chunks.append(d[:i+1])
d = d[i+1:]
yield empty.join(chunks)
chunks = []
s = empty.join(chunks)
if s:
yield s
if __name__ == '__main__':
print(next(iterate_stream(io.StringIO('ABCZ123'), 'Z')))
print(next(iterate_stream(io.BytesIO(b'ABCZ123'), b'Z')))