3

我需要使用 python 从大 txt 文件中获取 N 行。这些文件基本上是制表符分隔的表。我的任务有以下限制:

  • 这些文件可能包含标题(有些具有多行标题)。
  • 标题需要以相同的顺序出现在输出中。
  • 每条线只能取一次。
  • 目前最大的文件约为 150GB(约 60 000 000 行)。
  • 文件中的行长度大致相同,但在不同文件之间可能会有所不同。
  • 我通常会随机抽取 5000 行(我可能需要多达 1 000 000 行)

目前我已经编写了以下代码:

inputSize=os.path.getsize(options.input)
usedPositions=[] #Start positions of the lines already in output

with open(options.input) as input:
    with open(options.output, 'w') as output:

        #Handling of header lines
        for i in range(int(options.header)):
            output.write(input.readline())
            usedPositions.append(input.tell())

        # Find and write all random lines, except last
        for j in range(int(args[0])):
            input.seek(random.randrange(inputSize)) # Seek to random position in file (probably middle of line)
            input.readline() # Read the line (probably incomplete). Next input.readline() results in a complete line.
            while input.tell() in usedPositions: # Take a new line if current one is taken
                input.seek(random.randrange(inputSize))
                input.readline() 
            usedPositions.append(input.tell()) # Add line start position to usedPositions
            randomLine=input.readline() # Complete line
            if len(randomLine) == 0: # Take first line if end of the file is reached
                input.seek(0)
                for i in range(int(options.header)): # Exclude headers
                    input.readline()
                randomLine=input.readline()
            output.write(randomLine)            

这段代码似乎工作正常。

我知道这段代码更喜欢输入中最长行之后的行,因为 seek() 最有可能返回最长行上的位置,并且下一行被写入输出。这无关紧要,因为输入文件中的行长度大致相同。我也知道如果 N 大于输入文件中的行数,此代码会导致无限循环。我不会对此进行检查,因为获取行数需要很多时间。

RAM 和 HDD 限制无关紧要。我只关心程序的速度。有没有办法进一步优化这段代码?或者也许有更好的方法?

编辑:澄清一下,一个文件中的行长度大致相同。但是,我有多个文件需要运行此脚本,并且这些文件的平均行长度会有所不同。例如,文件 A 每行可能有 ~100 个字符,文件 B 每行可能有 ~50000 个字符。我事先不知道任何文件的平均行长。

4

5 回答 5

8

只有一种方法可以避免顺序读取所有文件直到您采样的最后一行 - 我很惊讶到目前为止没有一个答案提到它:

您必须寻找文件内的任意位置,读取一些字节,如果您有一个典型的行长度,如您所说,应该是该值的 3 或 4 倍。然后拆分您在换行符(“\n”)上读取的块,并选择第二个字段 - 即随机位置的一行。

此外,为了能够始终如一地查找文件,应以“二进制读取”模式打开文件,因此应手动处理行尾标记的转换。

此技术无法为您提供已读取的行号,因此您将选定的行偏移量保留在文件中以避免重复:

#! /usr/bin/python
# coding: utf-8

import random, os


CHUNK_SIZE = 1000
PATH = "/var/log/cron"

def pick_next_random_line(file, offset):
    file.seek(offset)
    chunk = file.read(CHUNK_SIZE)
    lines = chunk.split(os.linesep)
    # Make some provision in case yiou had not read at least one full line here
    line_offset = offset + len(os.linesep) + chunk.find(os.linesep) 
    return line_offset, lines[1]

def get_n_random_lines(path, n=5):
    lenght = os.stat(path).st_size
    results = []
    result_offsets = set()
    with open(path) as input:
        for x in range(n):
            while True:
                offset, line = pick_next_random_line(input, random.randint(0, lenght - CHUNK_SIZE))
                if not offset in result_offsets:
                    result_offsets.add(offset)
                    results.append(line)
                    break
    return results

if __name__ == "__main__":
    print get_n_random_lines(PATH)
于 2012-09-05T16:11:39.787 回答
4

如果您需要文件中 N 行的统一样本,您需要知道要从中选择的确切数;随机搜索不会这样做,较长的行会使结果偏向于直接跟随最长行的行。

幸运的是,您只需要读取一次文件即可选择这 N 行。您基本上选择您的 N 前行(以随机顺序),然​​后根据读取的行数随机用新行替换选择的行,概率递减。

对于 N == 1,第 n 行读取替换前一个随机选择的机会是randint(0, n) < 1,因此,第二行有 50% 的机会被选择,第三行有 33.33% 的机会,等等。对于较大的 N,替换一个随着更多行的读取,您的集合中已经选择的行的随机分布相同。

Python random lines from subfolders中,Blkknght 编写了一个非常有用的函数,用于从可迭代对象中选择大小为 N 的随机样本:

import random

def random_sample(n, items):
    results = []

    for i, v in enumerate(items):
        r = random.randint(0, i)
        if r < n:
            if i < n:
                results.insert(r, v) # add first n items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < n:
        raise ValueError("Sample larger than population.")

    return results

与您的要求相结合以保留一组标头是微不足道的:

from itertools import islice

with open(options.input) as input:
    with open(options.output, 'w') as output:

        # Handling of header lines
        # Use islice to avoid buffer issues with .readline()
        for line in islice(input, int(options.header)):
            output.write(line)

        # Pick a random sample
        for line in random_sample(int(args[0]), input):
            output.write(line)

这将一次性读取您的整个文件,选择一个统一的随机样本,并将其写入输出文件。因此,这具有 Θ(L) 复杂度,其中 L 是文件中的行数。

于 2012-09-06T08:23:45.933 回答
3

我相信随机选择 N 个行号会更快,然后逐行检查文件一次,然后取出列表中的行号。目前,您必须为每个随机数寻找随机位置,所以它是 O(N*M),其中 M 是文件的大小。我建议的是O(M)。

于 2012-09-05T10:42:35.300 回答
1
  • 明显的改进是set()用于您的usedPositions变量 - 查找会更快,并且由于您需要处理多达 10^6 个已使用的位置,因此查找时间并非无关紧要。
  • 在 for 循环中使用xrange而不是。range似乎没有必要分配完整的整数列表。
于 2012-09-05T10:43:13.240 回答
0

未经测试(并且需要两次读取文件):

import random

N = 5000
with open('file.in') as fin:
    line_count = sum(1 for i in fin)
    fin.seek(0)
    to_take = set(random.sample(xrange(line_count), N))
    for lineno, line in enumerate(fin):
        if lineno in to_take:
            pass # use it

但是,由于您提到行的大小“大致”相同,因此您可以使用os.path.getsize它并将其除以平均行长度(无论是已知的,还是从文件中的 N 多行中嗅出),然后使用它来生成line_count- 它对于随机样本来说已经足够接近了。

您也可以mmap使用文件大小、平均行长、行数的最佳猜测和随机行号的组合来“搜索”,然后向后或向前搜索到下一行的开头。(因为mmap这将使您能够将其视为字符串,因此您可以使用.index偏移量或re如果您真的想使用)。

于 2012-09-05T12:29:47.643 回答