9

我遇到的问题的描述有点复杂,我会在提供更完整的信息方面犯错。对于不耐烦的人,这是我可以总结的最简短的方式:

在抛出换行符的同时,将文本文件拆分为大小为 N(绑定 N,例如 36)的所有(重叠)子字符串的最快(最少执行时间)方法是什么。

我正在编写一个模块来解析基于 FASTA ascii 的基因组格式的文件。这些文件包含所谓的“hg18”人类参考基因组,如果您愿意,您可以从UCSC 基因组浏览器下载(加油!)。

您会注意到,基因组文件由 chr[1..22].fa 和 chr[XY].fa 以及一组在本模块中未使用的其他小文件组成。

已经存在几个用于解析 FASTA 文件的模块,例如 BioPython 的 SeqIO。(抱歉,我会发布一个链接,但我还没有这样做的要点。)不幸的是,我能够找到的每个模块都没有执行我想要执行的特定操作。

我的模块需要将基因组数据(例如,'CAGTACGTCAGACTATACGGAGCTA' 可能是一条线)拆分为每个重叠的 N 长度子字符串。让我举一个例子,使用一个非常小的文件(实际的染色体文件长度在 355 到 2000 万个字符之间)并且 N=8

>>>导入 cStringIO
>>>example_file = cStringIO.StringIO("""\
>标题
CAGTcag
TFgcACF
""")
>>>在解析中读取(example_file):
...打印阅读
...
CAGTCAGTF
AGTCAGTFG
GTCAGTFGC
TCAGTFGCA
CAGTFGCAC
AGTFGCACF

从我能想到的方法中,我发现的功能绝对是最好的:


def parse(file):
  size = 8 # of course in my code this is a function argument
  file.readline() # skip past the header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

这可行,但不幸的是,以这种方式解析人类基因组仍然需要大约 1.5 小时(见下面的注释)。也许这是我将使用这种方法看到的最好的方法(可能需要进行完整的代码重构,但我想避免它,因为这种方法在代码的其他领域有一些非常具体的优势),但我我想我会把它交给社区。

谢谢!

  • 请注意,这一次包括很多额外的计算,例如计算反向链读取和对大约 5G 大小的哈希进行哈希表查找。

回答后结论:事实证明,与程序的其余部分相比,使用 fileobj.read() 然后操作生成的字符串(string.replace() 等)花费的时间和内存相对较少,所以我使用了方法。感谢大家!

4

4 回答 4

4

Could you mmap the file and start pecking through it with a sliding window? I wrote a stupid little program that runs pretty small:

USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
sarnold  20919  0.0  0.0  33036  4960 pts/2    R+   22:23   0:00 /usr/bin/python ./sliding_window.py

Working through a 636229 byte fasta file (found via http://biostar.stackexchange.com/questions/1759) took .383 seconds.

#!/usr/bin/python

import mmap
import os

  def parse(string, size):
    stride = 8
    start = string.find("\n")
    while start < size - stride:
        print string[start:start+stride]
        start += 1

fasta = open("small.fasta", 'r')
fasta_size = os.stat("small.fasta").st_size
fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
parse(fasta_map, fasta_size)
于 2011-01-26T06:28:49.250 回答
3

我怀疑问题在于你有太多以字符串格式存储的数据,这对你的用例来说真的很浪费,以至于你的实际内存用完了并且颠簸交换。 128 GB应该足以避免这种情况...... :)

由于您在评论中指出无论如何都需要存储其他信息,因此我选择引用父字符串的单独类。我使用 hg18 的 chromFa.zip 中的 chr21.fa 进行了简短的测试;该文件大约 48MB,不到 1M 行。我这里只有 1GB 的内存,所以我只是在之后丢弃了这些对象。因此,此测试不会显示碎片、缓存或相关问题,但我认为它应该是测量解析吞吐量的一个很好的起点:

import mmap
import os
import time
import sys

class Subseq(object):
  __slots__ = ("parent", "offset", "length")

  def __init__(self, parent, offset, length):
    self.parent = parent
    self.offset = offset
    self.length = length

  # these are discussed in comments:
  def __str__(self):
    return self.parent[self.offset:self.offset + self.length]

  def __hash__(self):
    return hash(str(self))

  def __getitem__(self, index):
    # doesn't currently handle slicing
    assert 0 <= index < self.length
    return self.parent[self.offset + index]

  # other methods

def parse(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Subseq(whole, offset, size)

class Seq(object):
  __slots__ = ("value", "offset")
  def __init__(self, value, offset):
    self.value = value
    self.offset = offset

def parse_sep_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Seq(whole[offset:offset + size], offset)

def parse_plain_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_tuple(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield (whole, offset, size)

def parse_orig(file, size=8):
  file.readline() # skip header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

def parse_os_read(file, size=8):
  file.readline()  # skip header
  file_size = os.fstat(file.fileno()).st_size
  whole = os.read(file.fileno(), file_size).replace("\n", "").upper()
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_mmap(file, size=8):
  file.readline()  # skip past the header
  buffer = ""
  for line in file:
    buffer += line
    if len(buffer) >= size:
      for start in xrange(0, len(buffer) - size + 1):
        yield buffer[start:start + size].upper()
      buffer = buffer[-(len(buffer) - size + 1):]
  for start in xrange(0, len(buffer) - size + 1):
    yield buffer[start:start + size]

def length(x):
  return sum(1 for _ in x)

def duration(secs):
  return "%dm %ds" % divmod(secs, 60)


def main(argv):
  tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read]
  n = 0
  for fn in tests:
    n += 1
    with open(argv[1]) as f:
      start = time.time()
      length(fn(f))
      end = time.time()
      print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))

  fn = parse_mmap
  n += 1
  with open(argv[1]) as f:
    f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
    start = time.time()
    length(fn(f))
    end = time.time()
  print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))


if __name__ == "__main__":
  sys.exit(main(sys.argv))

1  parse                 1m 42s
2  parse_sep_str         1m 42s
3  parse_tuple           0m 29s
4  parse_plain_str       0m 36s
5  parse_orig            0m 45s
6  parse_os_read         0m 34s
7  parse_mmap            0m 37s

前四个是我的代码,而 orig 是你的,最后两个来自这里的其他答案。

与元组或纯字符串相比,用户定义的对象的创建和收集成本要高得多!这应该不足为奇,但我没有意识到它会产生如此大的差异(比较 #1 和 #3,它们实际上只在用户定义的类与元组中有所不同)。你说你想用字符串存储额外的信息,比如偏移量(如在 parse 和 parse_sep_str 情况下),所以你可以考虑在 C 扩展模块中实现该类型。如果您不想直接编写 C,请查看 Cython 及其相关内容。

案例#1 和#2 相同是意料之中的:通过指向一个父字符串,我试图节省内存而不是处理时间,但这个测试并没有衡量这一点。

于 2011-01-26T04:06:49.410 回答
3

一些经典的 IO 绑定更改。

  • 使用较低级别的读取操作,例如os.read并读入大型固定缓冲区。
  • 使用线程/多处理,其中一个读取和缓冲以及其他进程。
  • 如果您有多个处理器/机器,请使用 multiprocessing/mq 在 CPU 之间分配处理,例如 map-reduce。

使用较低级别的读取操作不会有太多的重写。其他的将是相当大的重写。

于 2011-01-26T07:56:41.443 回答
1

我有一个处理文本文件的函数,并在读写和并行计算中使用缓冲区,以及进程工作集的异步池。我有一个 2 核、8GB RAM、gnu/linux 的 AMD,可以在不到 1 秒的时间内处理 300000 行,在大约 4 秒内处理 1000000 行,在大约 20 秒内处理大约 4500000 行(超过 220MB):

# -*- coding: utf-8 -*-
import sys
from multiprocessing import Pool

def process_file(f, fo="result.txt", fi=sys.argv[1]):
    fi = open(fi, "r", 4096)
    fo = open(fo, "w", 4096)
    b = []
    x = 0
    result = None
    pool = None
    for line in fi:
        b.append(line)
        x += 1
        if (x % 200000) == 0:
            if pool == None:
                pool = Pool(processes=20)
            if result == None:
                result = pool.map_async(f, b)
            else:
                presult = result.get()
                result = pool.map_async(f, b)
                for l in presult:
                    fo.write(l)
            b = []
    if not result == None:
        for l in result.get():
            fo.write(l)
    if not b == []:
        for l in b:
            fo.write(f(l))
    fo.close()
    fi.close()

第一个参数是接收一行的函数,处理并返回结果将写入文件,下一个是输出文件,最后一个是输入文件(如果您在输入脚本文件中接收作为第一个参数,则不能使用最后一个参数) .

于 2013-01-24T08:04:13.313 回答