我创建了一个脚本,默认情况下会创建一个多处理进程;然后它工作正常。启动多个进程时,它开始挂起,而且并不总是在同一个地方。该程序大约有 700 行代码,所以我将尝试总结发生了什么。我想通过并行化最慢的任务(即对齐 DNA 序列)来充分利用我的多核。为此,我使用 subprocess 模块调用命令行程序:'hmmsearch',我可以通过 /dev/stdin 输入序列,然后通过 /dev/stdout 读出对齐的序列。我想挂起是因为这些多个子进程实例从标准输出/标准输入读取/写入,我真的不知道解决这个问题的最佳方法......我正在研究 os.fdopen(...) & os .tmpfile(), 创建临时文件句柄或管道,我可以在其中刷新数据。但是,我以前从未使用过,我无法想象如何使用 subprocess 模块来做到这一点。理想情况下,我想完全绕过硬盘驱动器,因为管道在高吞吐量数据处理方面要好得多!对此的任何帮助都会非常棒!
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq( multiprocessing.Process ):
def __init__( self, inPipe, outPipe, semaphore, options ):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
self.sem.acquire()
align_process.stdin.write( seq_record.format('fasta') )
align_process.stdin.close()
for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output
self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
在调试了一段时间后,我发现了一个一直存在但尚未完全解决的问题,但已经修复了(调试)过程中的其他一些低效率问题。有两个初始馈线函数,这个 align_seq 类和一个文件解析器parseHMM(),它将位置特定评分矩阵 (PSM) 加载到字典中。然后,主父进程将对齐与 PSM 进行比较,使用字典(字典)作为指向每个残基的相关分数的指针。为了计算我想要的分数,我有两个单独的 multiprocessing.Process 类,一个类logScore()计算对数优势比(使用 math.exp() );我把这个并行化;并将计算的分数排队到最后一个进程sumScore()它只是对这些分数求和(使用 math.fsum),将总和和所有特定位置的分数作为字典返回给父进程。即 Queue.put( [sum, { residual position : position specific score , ... } ] ) 我发现这特别令人困惑(队列太多!),所以我希望读者能够遵循.. . 完成上述所有计算后,我再提供将累积分数保存为制表符分隔输出的选项。这就是它现在(从昨晚开始)有时会中断的地方,因为我确保它会为每个应该有分数的位置打印出一个分数。我认为由于延迟(计算机计时不同步),有时首先放入队列的logScore没有达到sumScore第一的。为了让 sumScore 知道何时返回计数并重新开始,我将“endSEQ”放入执行计算的最后一个 logScore 进程的队列中。我认为它也应该最后达到 sumScore,但情况并非总是如此;只有有时它会破裂。所以现在我不再遇到死锁,而是在打印或保存结果时出现 KeyError 。我相信有时会出现 KeyError 的原因是因为我为每个 logScore 进程创建了一个队列,但是他们应该都使用相同的队列。现在,我有类似的东西: -
class logScore( multiprocessing.Process ):
def __init__( self, inQ, outQ ):
self.inQ = inQ
...
def scoreSequence( processes, HMMPSM, sequenceInPipe ):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put( residue_score )
## End of sequence
processes[process_index].inQ.put( 'endSEQ' )
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
而我应该只创建一个队列以在所有 logScore 实例之间共享。IE
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )