2

我正在寻找一种更好(更快)的方法来从巨大的文本文件中识别特定条目,而不是提取与该条目对应的行。该文件格式为:

>Entry1.1
#size=1688
704 1   1   1   4
979 2   2   2   0
1220    1   1   1   4
1309    1   1   1   4
1316    1   1   1   4
1372    1   1   1   4
1374    1   1   1   4
1576    1   1   1   4
>Entry2.1
#size=6251
6110    3   1.5 0   2
6129    2   2   2   2
6136    1   1   1   4
6142    3   3   3   2
6143    4   4   4   1
6150    1   1   1   4
6152    1   1   1   4
>Entry3.2
#size=1777
AND SO ON-----------

我减少了每个条目对应的行数(上图),因为它们从几百到几千不等。包含所有这些条目的文件大小范围从 100 MB 到 600 MB。我通常需要识别和提取相应行的条目数从几百到 15,000 不等。目前我正在使用 REGEX 来识别条目名称,然后提取所有相应的行直到下一个“>”符号。为了加快进程,我在 python 3.0 中使用了“多处理”包。这是简化的代码:

def OldMethod(entry):##Finds entry and extracts corresponding lines till next '>'
    patbase = '(>*%s(?![^\n]+?\d).+?)(?=>|(?:\s*\Z))'###pattern for extraction of gene entry
    found = re.findall(patbase % entry, denfile, re.DOTALL)
    if found: 
        print ('Entry found in density file\n')
        ''Do processing of corresponding line''
    return processed_result

def NewMethod(entry):##As suggested in this thread
    name = entry[1]
    block = den_dict[name]
    if found:
        ''Do processing of correponding lines in Block''

def PPResults(module,alist):##Parallel processing
    npool = Pool(int(nproc))    
    res = npool.map_async(module, alist)
    results = (res.get())###results returned in form of a list 
    return results

main():
    ##Read Density file, split by '>' and make dictionary for key and corresponding lines
    fh_in = open(density_file, 'r') ###HUGE TEXT FILE
    denfile = fh_in2.read().split('>')[1:] ###read once use repeatedly
    global den_dict
    den_dict = {}
    for ent in denfile:
        ent_splt = ent.split('\n')
        den_dict[ent_splt[0]] = ent_splt[2:-1]
    ##Use new approach with multiprocess
    results = PPResults(NewMethod, a_list)###'a_list' holds entries for that proceesing needs to be done
    for i in results:##Write Results from list to file
        fh_out.write(i)

我在具有超过 500GB 和 42 个内核的服务器上运行它,但脚本仍然需要大量时间(几小时甚至一天),具体取决于要处理的大文件的大小和数量条目。在整个过程中,大部分时间都花在了定位特定条目上,因为条目的处理非常基础。

我想要实现的是尽可能减少运行时间。请建议我执行此分析的最快策略是什么。

结果:

在遵循“Janne Karila”的建议(下)并使用“NewMethod”(上)后,300 个条目的运行时间为 120 秒,其中包括 85 秒来读取巨大的密度文件并由 '>' == 35 秒分割以使用处理 300 个条目32个核心。

与 REGEX 一起使用“OldMethod”(上图)时,300 个条目的运行时间为 577 秒,其中包括 ~102 秒来读取巨大的密度文件 == 475 秒以使用 32 个内核处理 300 个条目。

读取大文件的时间从 12 秒到 102 秒波动,原因我不确定。最终,新方法至少快了 10~12 倍。目前看来是不错的改进。

谢谢

AK

4

3 回答 3

1

您的主要问题是您尝试对整个文件进行正则表达式,这需要大量时间。

  1. 你不应该阅读整个文件,.readlines()而是使用
  2. 将文件拆分为行后,您可以通过仅检查行中的第一个符号来轻松找到新条目,并且仅当此符号为“>”时,应用正则表达式来提取条目号等等。
  3. 然后再一次,您只需遍历行列表,直到第一个符号变为“>”然后停止。

一个 600MB 的文件所用的时间不应超过 20 秒。

于 2013-03-22T05:55:15.707 回答
1

可以通过提前将文件拆分为文件系统中的单个文件或将数据集导入数据库来实现“尽可能快”。这在实践中是否有意义取决于数据的寿命和您的使用模式。如果您平均处理一个数据集超过 2 到 3 次,那么在第一次需要它时预先支付拆分整个文件的费用是有意义的,然后或多或少地免费获得对相同数据的后续查询。

考虑到实现开销,并考虑到您可以通过更简单的方法将执行时间减少几个数量级,我不会尝试进行重大改革,除非您查询相同的数据集数千次。

对于简单的拆分,man csplit. 该程序使用起来有点棘手,因此您可能需要的不仅仅是手册页。

无论如何,分裂的时间听起来严重错误。您应该通过一次读取一行而不是尝试将整个文件读入核心来将其缩短到几分钟。

awk -v id="Entry1.1" '/^>/{p=($0 == ">" id)}p' file

这应该转化为大约十几行 Python。这是一个概念证明(例如,如果您不熟悉 Awk 并想了解上面的内容);它可能不是最优雅或最惯用的 Python。

import sys, fileinput, os
entry = sys.argv[1]
p = False
for line in fileinput.input(sys.argv[2:]):
    line = line.rstrip(os.linesep)
    if line[0] == '>':
        if line[1:] == entry:
            p = True
        else:
            p = False
    if p:
        print line
于 2013-03-22T06:12:28.040 回答
1

您可以将文件拆分为块,>并将它们存储在按条目名称索引的字典中。

d = dict(chunk.split(None,1) for chunk in denfile.split('>') if chunk)

查找条目就是d["Entry1.1"]


编辑:由于您花费大量时间阅读文件,因此您应该尝试在那段时间内并行完成处理。您不需要将整个文件存储在任何地方;只需在文件中遇到每个想要的条目后立即将其发送到处理。

def NewMethod(entry):
    '''Do processing of correponding lines in Block'''

def retrieve_chunks(filename, alist):
    '''Generator that yields entries from file when entry name is in alist'''
    aset = set(alist)   #use a set for fast lookups
    chunk = None
    with open(filename) as f:
        for line in f:
            if line.startswith('>'):
                if chunk:
                    yield chunk
                name = line[1:].strip() 
                if name in aset:
                    chunk = [name] #enables capture of subsequent lines
                else:
                    chunk = None   #disables capture
            elif chunk:
                chunk.append(line)
    if chunk:
        yield chunk

main():
    npool = Pool(int(nproc))
    results = []
    for entry in retrieve_chunks(density_file, a_list): 
        results.append(npool.apply_async(NewMethod, (entry,)))

    for proxy in results:
        fh_out.write(proxy.get())

请注意,顺便说一句,如果您将生成器传递给Pool.map_async,它会在开始任何工作之前读取所有内容。这就是我apply_async在循环中使用的原因。

于 2013-03-22T09:52:34.213 回答