我有一个非常大的 4GB 文件,当我尝试读取它时,我的计算机挂起。所以我想一块一块地读取它,在处理完每一块后将处理后的块存储到另一个文件中并读取下一块。
yield
这些碎片有什么方法吗?
我很想有一个懒惰的方法。
要编写惰性函数,只需使用yield
:
def read_in_chunks(file_object, chunk_size=1024):
"""Lazy function (generator) to read a file piece by piece.
Default chunk size: 1k."""
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open('really_big_file.dat') as f:
for piece in read_in_chunks(f):
process_data(piece)
另一种选择是使用iter
一个辅助函数:
f = open('really_big_file.dat')
def read1k():
return f.read(1024)
for piece in iter(read1k, ''):
process_data(piece)
如果文件是基于行的,则文件对象已经是行的惰性生成器:
for line in open('really_big_file.dat'):
process_data(line)
file.readlines()
接受一个可选的大小参数,该参数近似于返回的行中读取的行数。
bigfile = open('bigfilename','r')
tmp_lines = bigfile.readlines(BUF_SIZE)
while tmp_lines:
process([line for line in tmp_lines])
tmp_lines = bigfile.readlines(BUF_SIZE)
如果您的计算机、操作系统和 python 是 64 位的,那么您可以使用mmap 模块将文件的内容映射到内存中,并使用索引和切片访问它。这是文档中的一个示例:
import mmap
with open("hello.txt", "r+") as f:
# memory-map the file, size 0 means whole file
map = mmap.mmap(f.fileno(), 0)
# read content via standard file methods
print map.readline() # prints "Hello Python!"
# read content via slice notation
print map[:5] # prints "Hello"
# update content using slice notation;
# note that new content must have same size
map[6:] = " world!\n"
# ... and read again using standard file methods
map.seek(0)
print map.readline() # prints "Hello world!"
# close the map
map.close()
如果您的计算机、操作系统或 python 是 32 位的,那么 mmap-ing 大文件可能会保留大部分地址空间并使您的程序内存不足。
已经有很多很好的答案,但是如果您的整个文件在一行上并且您仍然想处理“行”(而不是固定大小的块),那么这些答案对您没有帮助。
99% 的时间,可以逐行处理文件。然后,如this answer中所建议的,您可以将文件对象本身用作惰性生成器:
with open('big.csv') as f:
for line in f:
process(line)
但是,可能会遇到没有行分隔符的非常大的文件'\n'
(常见情况是'|'
)。
'|'
到'\n'
处理前可能不是一个选项,因为它可能会弄乱可能合法包含的字段'\n'
(例如自由文本用户输入)。对于这种情况,我创建了以下代码段 [2021 年 5 月更新,适用于 Python 3.8+]:
def rows(f, chunksize=1024, sep='|'):
"""
Read a file where the row separator is '|' lazily.
Usage:
>>> with open('big.csv') as f:
>>> for r in rows(f):
>>> process(r)
"""
row = ''
while (chunk := f.read(chunksize)) != '': # End of file
while (i := chunk.find(sep)) != -1: # No separator found
yield row + chunk[:i]
chunk = chunk[i+1:]
row = ''
row += chunk
yield row
[对于旧版本的python]:
def rows(f, chunksize=1024, sep='|'):
"""
Read a file where the row separator is '|' lazily.
Usage:
>>> with open('big.csv') as f:
>>> for r in rows(f):
>>> process(r)
"""
curr_row = ''
while True:
chunk = f.read(chunksize)
if chunk == '': # End of file
yield curr_row
break
while True:
i = chunk.find(sep)
if i == -1:
break
yield curr_row + chunk[:i]
curr_row = ''
chunk = chunk[i+1:]
curr_row += chunk
我能够成功地使用它来解决各种问题。它已经过广泛的测试,具有各种块大小。这是我正在使用的测试套件,供那些需要说服自己的人使用:
test_file = 'test_file'
def cleanup(func):
def wrapper(*args, **kwargs):
func(*args, **kwargs)
os.unlink(test_file)
return wrapper
@cleanup
def test_empty(chunksize=1024):
with open(test_file, 'w') as f:
f.write('')
with open(test_file) as f:
assert len(list(rows(f, chunksize=chunksize))) == 1
@cleanup
def test_1_char_2_rows(chunksize=1024):
with open(test_file, 'w') as f:
f.write('|')
with open(test_file) as f:
assert len(list(rows(f, chunksize=chunksize))) == 2
@cleanup
def test_1_char(chunksize=1024):
with open(test_file, 'w') as f:
f.write('a')
with open(test_file) as f:
assert len(list(rows(f, chunksize=chunksize))) == 1
@cleanup
def test_1025_chars_1_row(chunksize=1024):
with open(test_file, 'w') as f:
for i in range(1025):
f.write('a')
with open(test_file) as f:
assert len(list(rows(f, chunksize=chunksize))) == 1
@cleanup
def test_1024_chars_2_rows(chunksize=1024):
with open(test_file, 'w') as f:
for i in range(1023):
f.write('a')
f.write('|')
with open(test_file) as f:
assert len(list(rows(f, chunksize=chunksize))) == 2
@cleanup
def test_1025_chars_1026_rows(chunksize=1024):
with open(test_file, 'w') as f:
for i in range(1025):
f.write('|')
with open(test_file) as f:
assert len(list(rows(f, chunksize=chunksize))) == 1026
@cleanup
def test_2048_chars_2_rows(chunksize=1024):
with open(test_file, 'w') as f:
for i in range(1022):
f.write('a')
f.write('|')
f.write('a')
# -- end of 1st chunk --
for i in range(1024):
f.write('a')
# -- end of 2nd chunk
with open(test_file) as f:
assert len(list(rows(f, chunksize=chunksize))) == 2
@cleanup
def test_2049_chars_2_rows(chunksize=1024):
with open(test_file, 'w') as f:
for i in range(1022):
f.write('a')
f.write('|')
f.write('a')
# -- end of 1st chunk --
for i in range(1024):
f.write('a')
# -- end of 2nd chunk
f.write('a')
with open(test_file) as f:
assert len(list(rows(f, chunksize=chunksize))) == 2
if __name__ == '__main__':
for chunksize in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
test_empty(chunksize)
test_1_char_2_rows(chunksize)
test_1_char(chunksize)
test_1025_chars_1_row(chunksize)
test_1024_chars_2_rows(chunksize)
test_1025_chars_1026_rows(chunksize)
test_2048_chars_2_rows(chunksize)
test_2049_chars_2_rows(chunksize)
f = ... # file-like object, i.e. supporting read(size) function and
# returning empty string '' when there is nothing to read
def chunked(file, chunk_size):
return iter(lambda: file.read(chunk_size), '')
for data in chunked(f, 65536):
# process the data
参考python的官方文档 https://docs.python.org/3/library/functions.html#iter
也许这种方法更pythonic:
from functools import partial
"""A file object returned by open() is a iterator with
read method which could specify current read's block size"""
with open('mydata.db', 'r') as f_in:
part_read = partial(f_in.read, 1024*1024)
iterator = iter(part_read, b'')
for index, block in enumerate(iterator, start=1):
block = process_block(block) # process your block data
with open(f'{index}.txt', 'w') as f_out:
f_out.write(block)
我想我们可以这样写:
def read_file(path, block_size=1024):
with open(path, 'rb') as f:
while True:
piece = f.read(block_size)
if piece:
yield piece
else:
return
for piece in read_file(path):
process_piece(piece)
由于我的声誉低,我不允许发表评论,但是使用 file.readlines([sizehint]) SilentGhosts 解决方案应该更容易
编辑: SilentGhost 是对的,但这应该比:
s = ""
for i in xrange(100):
s += file.next()
我的情况有点类似。目前尚不清楚您是否知道块大小(以字节为单位);我通常不知道,但所需的记录(行)数是已知的:
def get_line():
with open('4gb_file') as file:
for i in file:
yield i
lines_required = 100
gen = get_line()
chunk = [i for i, j in zip(gen, range(lines_required))]
更新:谢谢 nosklo。这就是我的意思。它几乎可以工作,只是它在“块之间”丢失了一条线。
chunk = [next(gen) for i in range(lines_required)]
这个技巧不会丢失任何线条,但看起来不太好。
要逐行处理,这是一个优雅的解决方案:
def stream_lines(file_name):
file = open(file_name)
while True:
line = file.readline()
if not line:
file.close()
break
yield line
只要没有空行。
您可以使用以下代码。
file_obj = open('big_file')
open() 返回一个文件对象
然后使用 os.stat 获取大小
file_size = os.stat('big_file').st_size
for i in range( file_size/1024):
print file_obj.read(1024)