6

假设我从 shell 运行了以下命令

{ 
samtools view -HS header.sam;           # command1
samtools view input.bam 1:1-50000000;   # command2
} | samtools view -bS - > output.bam    # command3

对于那些不熟悉 samtools 视图的人(因为这是 stackoverflow)。这本质上是在创建一个具有新标头的新 bam 文件。bam 文件通常是大型压缩文件,因此在某些情况下,即使通过文件也可能很耗时。另一种方法是执行 command2,然后使用 samtools reheader 切换标题。这会通过大文件两次。上述命令通过 bam 一次,这对于较大的 bam 文件很有用(即使压缩后它们也会大于 20GB - WGS)。

我的问题是如何使用子进程在 python 中实现这种类型的命令。

我有以下内容:

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### SOMEHOW APPEND sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=appended.stdout, stdout=fh_bam)

任何帮助是极大的赞赏。谢谢。

4

4 回答 4

4

如果字符串中已经有 shell 命令,那么您可以按原样运行它:

#!/usr/bin/env python
from subprocess import check_call

check_call(r"""
{ 
samtools view -HS header.sam;           # command1
samtools view input.bam 1:1-50000000;   # command2
} | samtools view -bS - > output.bam    # command3
""", shell=True)

要在 Python 中模拟管道:

#!/usr/bin/env python
from subprocess import Popen, PIPE

# start command3 to get stdin pipe, redirect output to the file
with open('output.bam', 'wb', 0) as output_file:
    command3 = Popen("samtools view -bS -".split(), 
                     stdin=PIPE, stdout=output_file)
# start command1 with its stdout redirected to command3 stdin
command1 = Popen('samtools view -HS header.sam'.split(),
                 stdout=command3.stdin)
rc_command1 = command1.wait() #NOTE: command3.stdin is not closed, no SIGPIPE or a write error if command3 dies
# start command2 after command1 finishes
command2 = Popen('samtools view input.bam 1:1-50000000'.split(),
                 stdout=command3.stdin)
command3.stdin.close() # inform command2 if command3 dies (SIGPIPE or a write error)
rc_command2 = command2.wait()
rc_command3 = command3.wait()
于 2015-02-28T09:05:18.503 回答
1

(我不能遗憾地发表评论,但这个“答案”是对 cmidi 答案的评论,如果有人可以移动它,将不胜感激!-- PS:该答案现已被删除......)

Marco 明确表示这些命令会产生大量输出,大约 20GB。如果您使用communicate(),它将等待进程终止,这意味着“fd”描述符将需要保存大量数据。实际上,操作系统会同时将数据刷新到磁盘,除非您的计算机有超过 20GB 的可用 RAM。所以你最终将中间数据写入磁盘,这是原作者想要避免的。为sirlark的回答+1!

于 2015-02-27T22:51:59.860 回答
0

我假设由于所涉及文件的大小,连接内存中前两个子进程的输出是不可行的。我建议将前两个子进程的输出包装在一个类似的文件中。看起来您只需要 read 方法,因为 popen 只会从其标准输入文件中读取,而不是查找或写入。下面的代码假定从 read 返回一个空字符串足以表明流处于 EOF

class concat(object):
    def __init__(self, f1, f2):
        self.f1 = f1
        self.f2 = f2

    def read(self, *args):
        ret = self.f1.read(*args)
        if ret == '':
            ret = self.f2.read(*args)
        return ret

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### Somehow Append sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=concat(sub_0.stdout, sub_1.stdout), stdout=fh_bam)

为了澄清,f1.read将阻塞,并且仅''在管道关闭/EOF'd 时返回。concat.read仅在发生这种情况后才尝试读取,f2因此输出不会交织在一起。当然,重复读取结尾会产生轻微的开销,这可以通过设置一个标志变量来指示要从哪个文件读取来避免。我怀疑它会显着提高性能。f1f2f1

于 2015-02-27T22:21:54.173 回答
-1

虽然 Popen 接受类似文件的对象,但它实际上使用底层文件句柄/描述符,而不是文件对象的读写方法进行通信,正如@JF Sebastian 正确指出的那样。更好的方法是使用os.pipe()不使用磁盘的管道 ( )。这允许您将输出流直接连接到另一个进程的输入流,这正是您想要的。那么问题只是序列化的问题,以确保两个源流不会交错。

import os
import subprocess

r, w = os.pipe()

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_sink = subprocess.Popen(params_2, stdin=r, stdout=fh_bam, bufsize=4096)
sub_src1 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src1.communicate()
sub_src2 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src2.communicate()

我们首先打开接收器(管道的读取器),然后communicate打开源进程,只是为了避免@Ariel 提到的潜在阻塞。这也迫使第一个源进程在第二个源进程有机会写入管道之前完成并通过管道刷新其输出,从而防止交错/破坏的输出。您可以使用 bufsize 值来调整性能。

这几乎正​​是 shell 命令正在做的事情。

于 2015-02-28T12:05:26.147 回答