52

我正在编写一个代码来一次获取一个巨大的文本文件(几 GB)N 行,处理该批处理,然后移动到接下来的 N 行,直到我完成整个文件。(我不在乎最后一批是否不是完美的尺寸)。

我一直在阅读有关使用 itertools islice 进行此操作的信息。我想我已经完成了一半:

from itertools import islice
N = 16
infile = open("my_very_large_text_file", "r")
lines_gen = islice(infile, N)

for lines in lines_gen:
     ...process my lines...

麻烦的是我想处理下一批 16 行,但我错过了一些东西

4

6 回答 6

67

islice()可用于获取n迭代器的下一项。因此,list(islice(f, n))将返回文件下一n行的列表f。在循环中使用它将为您提供成块的文件n。在文件的末尾,列表可能会更短,最后调用将返回一个空列表。

from itertools import islice
with open(...) as f:
    while True:
        next_n_lines = list(islice(f, n))
        if not next_n_lines:
            break
        # process next_n_lines

另一种方法是使用grouper 模式

with open(...) as f:
    for next_n_lines in izip_longest(*[f] * n):
        # process next_n_lines
于 2011-06-13T20:24:41.110 回答
7

这个问题似乎假设通过一次读取 N 行块中的“巨大的文本文件”可以获得效率。这在已经高度优化的库上增加了一个缓冲应用层stdio,增加了复杂性,并且可能绝对不会给您带来任何好处。

因此:

with open('my_very_large_text_file') as f:
    for line in f:
        process(line)

在时间、空间、复杂性和可读性方面可能优于任何替代方案。

另请参阅Rob Pike 的前两条规则Jackson 的两条规则PEP-20 The Zen of Python。如果你真的只是想玩,islice你应该忽略大文件的东西。

于 2011-06-13T22:22:38.810 回答
3

这是使用groupby的另一种方式:

from itertools import count, groupby

N = 16
with open('test') as f:
    for g, group in groupby(f, key=lambda _, c=count(): c.next()/N):
        print list(group)

这个怎么运作:

基本上 groupby() 将通过 key 参数的返回值对行进行分组,而 key 参数是lambda函数,并且使用 c 参数将在定义函数时绑定到count()lambda _, c=count(): c.next()/N的事实,因此每次都会调用lambda 函数并评估返回值以确定将对行进行分组的分组器:groupby()

# 1 iteration.
c.next() => 0
0 / 16 => 0
# 2 iteration.
c.next() => 1
1 / 16 => 0
...
# Start of the second grouper.
c.next() => 16
16/16 => 1   
...
于 2011-06-13T20:37:34.533 回答
2

由于添加了从文件中选择的行在统计上均匀分布的要求,因此我提供了这种简单的方法。

"""randsamp - extract a random subset of n lines from a large file"""

import random

def scan_linepos(path):
    """return a list of seek offsets of the beginning of each line"""
    linepos = []
    offset = 0
    with open(path) as inf:     
        # WARNING: CPython 2.7 file.tell() is not accurate on file.next()
        for line in inf:
            linepos.append(offset)
            offset += len(line)
    return linepos

def sample_lines(path, linepos, nsamp):
    """return nsamp lines from path where line offsets are in linepos"""
    offsets = random.sample(linepos, nsamp)
    offsets.sort()  # this may make file reads more efficient

    lines = []
    with open(path) as inf:
        for offset in offsets:
            inf.seek(offset)
            lines.append(inf.readline())
    return lines

dataset = 'big_data.txt'
nsamp = 5
linepos = scan_linepos(dataset) # the scan only need be done once

lines = sample_lines(dataset, linepos, nsamp)
print 'selecting %d lines from a file of %d' % (nsamp, len(linepos))
print ''.join(lines)

我在一个包含 1.7GB 磁盘的 300 万行的模拟数据文件上对其进行了测试。scan_linepos在我不太热的桌面上,运行时占主导地位大约 20 秒。

只是为了检查sample_lines我使用timeit模块的性能

import timeit
t = timeit.Timer('sample_lines(dataset, linepos, nsamp)', 
        'from __main__ import sample_lines, dataset, linepos, nsamp')
trials = 10 ** 4
elapsed = t.timeit(number=trials)
print u'%dk trials in %.2f seconds, %.2fµs per trial' % (trials/1000,
        elapsed, (elapsed/trials) * (10 ** 6))

对于 的各种值nsamp;当nsamp为 100 时,单个sample_lines在 460µs 内完成,并在每次调用 47ms 时线性扩展到 10k 个样本。

自然的下一个问题是Random 几乎不是随机的?,答案是“亚密码学,但对生物信息学来说肯定没问题”。

于 2011-06-14T16:52:10.233 回答
1

使用过什么是最“pythonic”的方式来迭代块中的列表?

from itertools import izip_longest

def grouper(iterable, n, fillvalue=None):
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return izip_longest(*args, fillvalue=fillvalue)


with open(filename) as f:
    for lines in grouper(f, chunk_size, ""): #for every chunk_sized chunk
        """process lines like 
        lines[0], lines[1] , ... , lines[chunk_size-1]"""
于 2011-06-13T20:32:29.723 回答
0

假设“批处理”意味着要一次处理所有 16 个记录而不是单独处理,一次读取文件一条记录并更新计数器;当计数器达到 16 时,处理该组。

interim_list = []
infile = open("my_very_large_text_file", "r")
ctr = 0
for rec in infile:
    interim_list.append(rec)
    ctr += 1
    if ctr > 15:
        process_list(interim_list)
        interim_list = []
        ctr = 0

the final group

process_list(interim_list)

于 2011-06-13T22:46:17.190 回答