2

背景:
Linux 上的 Python 2.6.6。DNA 序列分析流程的第一部分。
我想从已安装的远程存储 (LAN) 中读取可能压缩过的文件,如果它是压缩过的;gunzip 将其压缩到一个流中(即使用gunzip FILENAME -c),如果流(文件)的第一个字符是“@”,则将该整个流路由到一个过滤程序中,该程序接受标准输入的输入,否则只需将其直接通过管道传输到本地文件磁盘。我想最大限度地减少从远程存储读取/查找文件的次数(只需一次通过文件不应该是不可能的吗?)。

示例输入文件的内容,前四行对应于 FASTQ 格式的一条记录:

@I328_1_FC30MD2AAXX:8:1:1719:1113/1                                        
GTTATTATTATAATTTTTTACCGCATTTATCATTTCTTCTTTATTTTCATATTGATAATAAATATATGCAATTCG
+I328_1_FC30MD2AAXX:8:1:1719:1113/1                                        
hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhahhhhhhfShhhYhhQhh]hhhhffhU\UhYWc

不应通过管道传输到过滤程序的文件包含如下所示的记录(前两行对应于 FASTA 格式的一条记录):

>I328_1_FC30MD2AAXX:8:1:1719:1113/1
GTTATTATTATAATTTTTTACCGCATTTATCATTTCTTCTTTATTTTCATATTGATAATAAATATATGCAATTCG

有些人编写了半伪代码来可视化我想要做什么(我知道这不可能像我写的那样)。我希望它有一些意义:

if gzipped:
    gunzip = Popen(["gunzip", "-c", "remotestorage/file.gz"], stdout=PIPE)
    if gunzip.stdout.peek(1) == "@": # This isn't possible
        fastq = True
    else:
        fastq = False
if fastq:
    filter = Popen(["filter", "localstorage/outputfile.fastq"], stdin=gunzip.stdout).communicate()
else:
    # Send the gunzipped stream to another file

忽略这样一个事实,即代码不会像我在这里写的那样运行,而且我没有错误处理等,所有这些都已经在我的其他代码中了。我只是想要帮助窥视溪流或找到解决方法。如果可以的话,我会很棒,gunzip.stdout.peek(1)但我意识到这是不可能的。

到目前为止我所尝试的:
我认为 subprocess.Popen 可能会帮助我实现这一点,并且我尝试了很多不同的想法,其中包括尝试使用某种 io.BufferedRandom() 对象将流写入但我不知道那将如何工作。我知道流是不可搜索的,但也许一种解决方法可能是读取 gunzip-stream 的第一个字符,然后创建一个新流,您首先根据文件内容输入“@”或“>”,然后填充其余部分的 gunzip.stdout-stream 到新的流。然后将这个新流输入过滤器的 Popen 标准输入。

请注意,文件大小可能比可用内存大几倍。我不想从远程存储中对源文件执行一次以上的读取,也不想进行不必要的文件访问。

欢迎任何想法!请问我问题,以便我澄清我是否说得不够清楚。

4

2 回答 2

1

这是您第一次根据文件内容输入“@”或“>”的实现,然后将 gunzip.stdout-stream 的其余部分填充到新的流提案中。我只测试了测试的本地文件分支,但应该足以演示这个概念。

if gzipped:
    source = Popen(["gunzip", "-c", "remotestorage/file.gz"], stdout=PIPE)
else:
    source = Popen(["cat", "remotestorage/file"], stdout=PIPE)
firstchar = source.stdout.read(1)
# "unread" the char we've just read
source = Popen([r"(printf '\x%02x' && cat)" % ord(firstchar)],
               shell=True, stdin=source.stdout, stdout=PIPE)

# Now feed the output to a filter or to a local file.
flocal = None
try:
    if firstchar == "@":
        filter = Popen(["filter", "localstorage/outputfile.fastq"],
                       stdin=source.stdout)
    else:
        flocal = open('localstorage/outputfile.stream', 'w')
        filter = Popen(["cat"], stdin=source.stdout, stdout=flocal)
    filter.communicate()
finally:
    if flocal is not None:
        flocal.close()

这个想法是从 source 命令的输出中读取单个字符,然后使用 重新创建原始输出(printf '\xhh' && cat),从而有效地实现 peek。替换流指定shell=Trueto Popen,将其留给 shell 并cat完成繁重的工作。数据始终保留在管道中,永远不会完全读入内存。请注意,shell 的服务仅被请求用于Popen实现未读取窥视字节的单个调用,而不是涉及用户提供的文件名的调用。即使在那个时候,字节也会被转义为十六进制,以确保 shell 在调用时不会破坏它printf

可以进一步清理代码以实现名为peek返回偷看内容和替换的实际函数new_source

于 2012-10-07T21:20:30.390 回答
0

在 Python 中包装 shell 命令是没有意义的。您可以在 Python 中实现所需的一切,但无需花费大量精力:

  1. 打开输入文件并读取前 3 个字节。如果它们相等1F 8B 08,那么它应该是 gzip 文件。
  2. 重置文件标记
  3. 如果是 gzip 文件或读取文件,则将文件内容传递给 zlib.decompress()
  4. 如果需要,传递给过滤函数
  5. 将结果写入文件

编辑

这不起作用,因为在传递给 zlib 之前需要剥离 gzip 标头。fh.seek(0)但是,如果您想确定文件是 gzip(使用 DEFLATE 压缩),则可以检查前 3 个字节,执行 a并将文件传递给 gzip.open()。

将文件传递给 gzip 并捕获文件未压缩时抛出的异常可能更容易:

import gzip

try:
    in_file = gzip.open("infile")
    f_contents = in_file.read()
except IOError, e:
    # Re-raise exception if exception message is not "Not a gzipped file"
    # Perhaps it would be safer to check the header!
    if e.__str__() != "Not a gzipped file":
        raise
    in_file = open("infile")
    f_contents = in_file.read()

if f_contents[0] == "@":
    result = filter_function(f_contents)
else:
    result = f_contents

new_file = open("new_file", "w")
new_file.write(result)  
于 2012-10-07T21:28:17.023 回答