0

我创建了一个脚本,默认情况下会创建一个多处理进程;然后它工作正常。启动多个进程时,它开始挂起,而且并不总是在同一个地方。该程序大约有 700 行代码,所以我将尝试总结发生了什么。我想通过并行化最慢的任务(即对齐 DNA 序列)来充分利用我的多核。为此,我使用 subprocess 模块调用命令行程序:'hmmsearch',我可以通过 /dev/stdin 输入序列,然后通过 /dev/stdout 读出对齐的序列。我想挂起是因为这些多个子进程实例从标准输出/标准输入读取/写入,我真的不知道解决这个问题的最佳方法......我正在研究 os.fdopen(...) & os .tmpfile(), 创建临时文件句柄或管道,我可以在其中刷新数据。但是,我以前从未使用过,我无法想象如何使用 subprocess 模块来做到这一点。理想情况下,我想完全绕过硬盘驱动器,因为管道在高吞吐量数据处理方面要好得多!对此的任何帮助都会非常棒!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

在调试了一段时间后,我发现了一个一直存在但尚未完全解决的问题,但已经修复了(调试)过程中的其他一些低效率问题。有两个初始馈线函数,这个 align_seq 类和一个文件解析器parseHMM(),它将位置特定评分矩阵 (PSM) 加载到字典中。然后,主父进程将对齐与 PSM 进行比较,使用字典(字典)作为指向每个残基的相关分数的指针。为了计算我想要的分数,我有两个单独的 multiprocessing.Process 类,一个类logScore()计算对数优势比(使用 math.exp() );我把这个并行化;并将计算的分数排队到最后一个进程sumScore()它只是对这些分数求和(使用 math.fsum),将总和和所有特定位置的分数作为字典返回给父进程。即 Queue.put( [sum, { residual position : position specific score , ... } ] ) 我发现这特别令人困惑(队列太多!),所以我希望读者能够遵循.. . 完成上述所有计算后,我再提供将累积分数保存为制表符分隔输出的选项。这就是它现在(从昨晚开始)有时会中断的地方,因为我确保它会为每个应该有分数的位置打印出一个分数。我认为由于延迟(计算机计时不同步),有时首先放入队列的logScore没有达到sumScore第一的。为了让 sumScore 知道何时返回计数并重新开始,我将“endSEQ”放入执行计算的最后一个 logScore 进程的队列中。我认为它也应该最后达到 sumScore,但情况并非总是如此;只有有时它会破裂。所以现在我不再遇到死锁,而是在打印或保存结果时出现 KeyError 。我相信有时会出现 KeyError 的原因是因为我为每个 logScore 进程创建了一个队列,但是他们应该都使用相同的队列。现在,我有类似的东西: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

而我应该只创建一个队列以在所有 logScore 实例之间共享。IE

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
4

3 回答 3

2

这不是流水线的工作原理......但为了让您放心,这里是子流程文档的摘录:

stdin、stdout 和 stderr 分别指定执行程序的标准输入、标准输出和标准错误文件句柄。有效值为 PIPE、现有文件描述符(正整数)、现有文件对象和无。PIPE 表示应该创建一个通往子级的新管道。使用 None,不会发生重定向;子文件句柄将从父文件继承。

最有可能出现故障的区域是与主进程的通信或您对信号量的管理。也许由于错误,状态转换/同步没有按预期进行?我建议通过在每次阻塞调用之前和之后添加日志记录/打印语句来进行调试——您与主进程通信的位置以及获取/释放信号量的位置以缩小出错的位置。

我也很好奇 - 信号量是绝对必要的吗?

于 2011-01-06T21:45:19.367 回答
1

我还想并行化简单的任务,为此我创建了一个小 Python 脚本。你可以看看: http ://bioinf.comav.upv.es/psubprocess/index.html

比你想要的更通用一点,但对于简单的任务来说很容易使用。这对你来说可能至少有些不合适。

何塞布兰卡

于 2011-01-07T11:32:50.073 回答
0

这可能是子进程中的死锁,您是否尝试过使用通信方法而不是等待? http://docs.python.org/library/subprocess.html

于 2011-01-07T14:28:36.673 回答