7

我在多个子文件夹中的 .txt 文件中有很多任务。我正在尝试从这些文件夹、它们包含的文件以及最后一个文件中的文本行中随机选择总共 10 个任务。应删除或标记选定的行,以便在下次执行时不会选择它。这可能是一个太宽泛的问题,但我会很感激任何意见或方向。

这是我到目前为止的代码:

#!/usr/bin/python  
import random   
with open('C:\\Tasks\\file.txt') as f:  
    lines = random.sample(f.readlines(),10)    
print(lines)
4

3 回答 3

15

这是一个简单的解决方案,每个样本只通过一次文件。如果您确切知道将从文件中采样的项目数量,那可能是最佳选择。

首先是示例功能。这使用了@NedBatchelder 在对早期答案的评论中链接到的相同算法(尽管那里显示的 Perl 代码只选择了一行,而不是几行)。它从可迭代的行中选择值,并且只需要在任何给定时间将当前选择的行保存在内存中(加上下一个候选行)。ValueError如果可迭代的值少于请求的样本大小,它会引发 a 。

import random

def random_sample(n, items):
    results = []

    for i, v in enumerate(items):
        r = random.randint(0, i)
        if r < n:
            if i < n:
                results.insert(r, v) # add first n items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < n:
        raise ValueError("Sample larger than population.")

    return results

编辑:在另一个问题中,用户@DzinX 注意到,如果您对大量值进行采样,则insert在此代码中使用 会使性能变差( )。O(N^2)他避免这个问题的改进版本在这里/编辑

现在我们只需要为我们的函数创建一个合适的可迭代项来采样。这是我使用生成器的方法。此代码一次只会打开一个文件,并且一次不需要超过一行内存。可选exclude参数(如果存在)应该是set包含在先前运行中选择的行(因此不应再次产生)。

import os

def lines_generator(base_folder, exclude = None):
    for dirpath, dirs, files in os.walk(base_folder):
        for filename in files:
            if filename.endswith(".txt"):
                fullPath = os.path.join(dirpath, filename)
                with open(fullPath) as f:
                     for line in f:
                         cleanLine = line.strip()
                         if exclude is None or cleanLine not in exclude:
                             yield cleanLine

现在,我们只需要一个包装函数将这两个部分联系在一起(并管理一组可见的行)。它可以返回单个大小n的样本或样本列表count,利用来自随机样本的切片也是随机样本的事实。

_seen = set()

def get_sample(n, count = None):
    base_folder = r"C:\Tasks"
    if count is None:
        sample = random_sample(n, lines_generator(base_folder, _seen))
        _seen.update(sample)
        return sample
    else:
        sample = random_sample(count * n, lines_generator(base_folder, _seen))
        _seen.update(sample)
        return [sample[i * n:(i + 1) * n] for i in range(count)]

以下是它的使用方法:

def main():
    s1 = get_sample(10)
    print("Sample1:", *s1, sep="\n")

    s2, s3 = get_sample(10,2) # get two samples with only one read of the files
    print("\nSample2:", *s2, sep="\n")
    print("\nSample3:", *s3, sep="\n")

    s4 = get_sample(5000) # this will probably raise a ValueError!
于 2012-08-26T23:28:25.963 回答
4

要在所有这些文件中获得适当的随机分布,您需要将它们视为一大组行并随机选择 10 个。换句话说,您必须至少阅读所有这些文件一次才能至少弄清楚您有多少行

但是,您不需要将所有行都保存在内存中。您必须分两个阶段执行此操作:索引您的文件以计算每个文件中的行数,然后从这些文件中选择 10 个随机行进行读取。

第一次索引:

import os

root_path = r'C:\Tasks\\'
total_lines = 0
file_indices = dict()

# Based on https://stackoverflow.com/q/845058, bufcount function
def linecount(filename, buf_size=1024*1024):
    with open(filename) as f:
        return sum(buf.count('\n') for buf in iter(lambda: f.read(buf_size), ''))

for dirpath, dirnames, filenames in os.walk(root_path):
    for filename in filenames:
         if not filename.endswith('.txt'):
             continue
         path = os.path.join(dirpath, filename)
         file_indices[total_lines] = path
         total_lines += linecount(path)

offsets = list(file_indices.keys())
offsets.sort()

现在我们有了一个偏移映射、指向文件名和总行数。现在我们选择十个随机索引,并从您的文件中读取这些:

import random
import bisect

tasks = list(range(total_lines))
task_indices = random.sample(tasks, 10)

for index in task_indices:
     # find the closest file index
     file_index = offsets[bisect.bisect(offsets, index) - 1]
     path = file_indices[file_index]
     curr_line = file_index
     with open(path) as f:
         while curr_line <= index:
             task = f.readline()
             curr_line += 1
     print(task)
     tasks.remove(index)

请注意,您只需要索引一次;您可以将结果存储在某处,并且仅在文件更新时更新它。

另请注意,您的任务现在“存储”在tasks列表中;这些是文件中行的索引,我在打印所选任务时从该变量中删除索引。下次您运行random.sample()选择时,之前选择的任务将不再可供下次选择。如果您的文件确实发生更改,则此结构将需要更新,因为必须重新计算索引。这file_indices将帮助您完成该任务,但这超出了此答案的范围。:-)

如果您只需要一个10 项样本,请改用Blckknght 的解决方案,因为它只会遍历文件一次,而我的需要 10 个额外的文件打开。如果您需要多个样本,此解决方案每次需要样本时只需要额外打开 10 个文件,它不会再次扫描所有文件。如果您的文件少于 10 个,仍然使用 Blckknght 的答案。:-)

于 2012-08-26T10:23:19.677 回答
0

编辑:经过仔细审查,这个答案不符合要求。重做它让我想到了水库采样算法,@Blckknght 在他的回答中使用了该算法。所以忽略这个答案。

很少有办法做到这一点。这是一个...

  1. 获取所有任务文件的列表
  2. 随机选择一个
  3. 从该文件中随机选择一行
  4. 重复直到我们有所需的行数

编码...

import os
import random

def file_iterator(top_dir):
    """Gather all task files"""
    files = []
    for dirpath, dirnames, filenames in os.walk(top_dir):
        for filename in filenames:
            if not filename.endswith('.txt'):
                continue
            path = os.path.join(dirpath, filename)
            files.append(path)
    return files


def random_lines(files, number=10):
    """Select a random file, select a random line until we have enough
    """
    selected_tasks = []

    while len(selected_tasks) < number:
        f = random.choice(files)
        with open(f) as tasks:
            lines = tasks.readlines()
            l = random.choice(lines)
            selected_tasks.append(l)
    return selected_tasks


## Usage
files = file_iterator(r'C:\\Tasks')
random_tasks = random_lines(files)
于 2012-08-26T19:25:40.380 回答