1

我在 bash 中解决了以下问题,但考虑到我需要减少的文件大小,我觉得它效率很低而且很慢。希望有人知道如何在 Python 中做同样的事情并希望加快速度。

最初的问题是减少非常大的文本文件(50-60 百万行,制表符分隔的列)。其中一列被视为键,即我们确定文件中有多少具有唯一键的行,然后随机选择其中的一个百分比(例如,如果减少 75%,则为总数的四分之一)附加到一个新文件,将保留我们的结果。我们继续检查其余的键,随机化然后将包含每个唯一键的所有行减少相同的百分比。如果无法减少 - 我们只需将所有行转移到结果文件中。

正如我所说,我的 bash 脚本运行良好,但速度很慢,并且将各种 awk 和 grep 结构串在一起。无论如何,Python 应该以一种更优雅的方式来处理这个问题,并且不会过多地牺牲内存(同样,在这种情况下,我们正在处理 50+ 百万行文件)。任何建议/技巧都会有所帮助!谢谢!

4

3 回答 3

2

简单的解决方案是按键列对文件进行排序,例如,按第二列对制表符分隔的输入进行排序:

#!/bin/bash
printf "a\tz\nb\ty\nc\tx" | sort -k 2 -t $'\t'

然后解决一个更简单的问题,即为每个唯一键检索 25% 的随机行,其中具有相同键的所有行都相邻,并且每个唯一键至少应保留一行:

#!/usr/bin/env python
import random
import sys
from itertools import chain, groupby

def choose_random(iterator, fraction, random=random.random):
    """Lazy analog of:

        L = list(iterator)
        k = int(len(L) * fraction + .5) or 1 # get at least one
        result = random.sample(L, k)

    Note: this function doesn't randomize the order of elements
          that would require to keep selected elements in memory
          and number of output elements is not exactly k
    """
    # always yield at least one item if input is not empty
    item = next(iterator)
    it = (x for x in chain([item], iterator) if random() < fraction)
    for x in chain([next(it, item)], it):
        yield x

def getkey(line):
    return line.split("\t")[1] # 2nd column

for key, group in groupby(sys.stdin, key=getkey):
    sys.stdout.writelines(choose_random(group, fraction=0.25))

注意:输入文件的最后一行应该包含一个换行符,否则如果选择了最后一行,则输出会损坏。

该脚本接受标准输入上的排序(按键列)输入,并将减少的输出打印到标准输出。它一次只需要在内存中存储一​​行。这是一个单遍算法(O(n))。

于 2013-02-27T14:54:18.983 回答
1

因为你的问题很模糊,我会给出一个高水平的解决方案

  1. 不要读取内存中的整个文件fileObj.read()fileObj.readlines()而是遍历文件for line in fileObj。为什么?这将是美好的回忆
  2. 基于 List 创建队列的实现

    class Queue(object):
        def __init__(self, max_size):
            self.queue = []
            self.max_size = max_size
        def __getitem__(self, index):
            if 0 <= index < max_size:
                return self.queue[index]
            else:
                raise IndexError
        def __iter__(self):
            return iter(self.queue)
        def push(seq):
            if isinstance(seq, Iterable):
                if len(self.queue) + len(seq) > self.max_size:
                    raise Full
                self.queue = seq
            else:
                if len(self.queue) + 1 > self.max_size:
                    raise Full
                self.queue.append(seq)
        def pop():
            if self.queue:
                return self.queue.pop(0)
    
  3. 创建一个队列字典,其中 maxsize = 2 * 所选项目的百分比

就像是

    PCT_SELECTED = 100
    MAXSIZE = 2 * PCT_SELECTED
    KEY_START = 10
    KEY_STOP = 15 
    from collection import defaultdict
    queue_dict = defaultdict(Queue(MAXSIZE))
  1. 以非阻塞方式将元素放入队列
  2. 如果 Queue 为 FULL,它将引发 Exception Full,在这种情况下,您从 Queue 中随机选择 50% 的元素并丢弃其余元素。

就像是

    with open("your-file") as fin:
        for line in fin:
            key = line[KEY_START: KEY_STOP]
            try:
                queue_dict[key].push(line)
            except Full:
                queue_dict[key] = random.sample(queue_dict[key], PCT_SELECTED)
  1. 最后遍历字典并随机修剪掉队列

    queue_dict = {key: random.sample(value, PCT_SELECTED) for key, value in queue_dict.items()}
    
  2. 现在您可以通读字典并写入文件。

于 2013-02-27T10:36:36.880 回答
0

对于大量项目,只需检查每个项目的随机数即可选择 75%。

import random

with open('input') as f:
for line in f:
    if random.random() < 0.75:
        print line

如果您需要保证每个键至少有一项(即使它只有两行):

import random
keys = set()

with open('input') as f:
    for line in f:
        columns = line.split('\t')
        key = columns[0]

        if not key in keys:
           print line
           keys.add(key)
           continue

        if random.random() < 0.75:
            print line
于 2013-02-27T12:38:54.740 回答