我有一个复杂的 python 管道(我无法更改的代码),调用多个其他脚本和其他可执行文件。关键是运行 8000 多个目录需要很长时间,并进行一些科学分析。因此,我使用多处理模块编写了一个简单的包装器(可能不是最有效,但似乎有效)。
from os import path, listdir, mkdir, system
from os.path import join as osjoin, exists, isfile
from GffTools import Gene, Element, Transcript
from GffTools import read as gread, write as gwrite, sort as gsort
from re import match
from multiprocessing import JoinableQueue, Process
from sys import argv, exit
# some absolute paths
inbase = "/.../abfgp_in"
outbase = "/.../abfgp_out"
abfgp_cmd = "python /.../abfgp-2.rev/abfgp.py"
refGff = "/.../B0510_manual_reindexed_noSeq.gff"
# the Queue
Q = JoinableQueue()
i = 0
# define number of processes
try: num_p = int(argv[1])
except ValueError: exit("Wrong CPU argument")
# This is the function calling the abfgp.py script, which in its turn calls alot of third party software
def abfgp(id_, pid):
out = osjoin(outbase, id_)
if not exists(out): mkdir(out)
# logfile
log = osjoin(outbase, "log_process_%s" %(pid))
try:
# call the script
system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))
except:
print "ABFGP FAILED"
return
# parse the output
def extractGff(id_):
# code not relevant
# function called by multiple processes, using the Queue
def run(Q, pid):
while not Q.empty():
try:
d = Q.get()
print "%s\t=>>\t%s" %(str(i-Q.qsize()), d)
abfgp(d, pid)
Q.task_done()
except KeyboardInterrupt:
exit("Interrupted Child")
# list of directories
genedirs = [d for d in listdir(inbase)]
genes = gread(refGff)
for d in genedirs:
i += 1
indir = osjoin(inbase, d)
outdir = osjoin(outbase, d)
Q.put(d)
# this loop creates the multiple processes
procs = []
for pid in range(num_p):
try:
p = Process(target=run, args=(Q, pid+1))
p.daemon = True
procs.append(p)
p.start()
except KeyboardInterrupt:
print "Aborting start of child processes"
for x in procs:
x.terminate()
exit("Interrupted")
try:
for p in procs:
p.join()
except:
print "Terminating child processes"
for x in procs:
x.terminate()
exit("Interrupted")
print "Parsing output..."
for d in genedirs: extractGff(d)
现在的问题是,abfgp.py 使用了 os.chdir 函数,这似乎破坏了并行处理。我收到很多错误,指出无法找到某些(输入/输出)文件/目录进行读/写。尽管我通过 os.system() 调用脚本,但我认为生成单独的进程会阻止这种情况。
如何解决这些 chdir 干扰?
编辑:我可能会使用正确的目录将 os.system() 更改为 subprocess.Popen(cwd="...") 。我希望这会有所作为。
谢谢。