1

我有一个复杂的 python 管道(我无法更改的代码),调用多个其他脚本和其他可执行文件。关键是运行 8000 多个目录需要很长时间,并进行一些科学分析。因此,我使用多处理模块编写了一个简单的包装器(可能不是最有效,但似乎有效)。

from os import path, listdir, mkdir, system
from os.path import join as osjoin, exists, isfile
from GffTools import Gene, Element, Transcript
from GffTools import read as gread, write as gwrite, sort as gsort
from re import match
from multiprocessing import JoinableQueue, Process
from sys import argv, exit

# some absolute paths
inbase = "/.../abfgp_in"
outbase = "/.../abfgp_out"
abfgp_cmd = "python /.../abfgp-2.rev/abfgp.py"
refGff = "/.../B0510_manual_reindexed_noSeq.gff"

# the Queue
Q = JoinableQueue()
i = 0

# define number of processes
try: num_p = int(argv[1])
except ValueError: exit("Wrong CPU argument")

# This is the function calling the abfgp.py script, which in its turn calls alot of third party software
def abfgp(id_, pid):
    out = osjoin(outbase, id_)
    if not exists(out): mkdir(out)

    # logfile
    log = osjoin(outbase, "log_process_%s" %(pid))
    try:
        # call the script
        system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))
    except:
        print "ABFGP FAILED"
        return

# parse the output
def extractGff(id_):
   # code not relevant


# function called by multiple processes, using the Queue
def run(Q, pid):
    while not Q.empty():
        try:
            d = Q.get()             
            print "%s\t=>>\t%s" %(str(i-Q.qsize()), d)          
            abfgp(d, pid)
            Q.task_done()
        except KeyboardInterrupt:
            exit("Interrupted Child")

# list of directories
genedirs = [d for d in listdir(inbase)]
genes = gread(refGff)
for d in genedirs:
    i += 1
    indir = osjoin(inbase, d)
    outdir = osjoin(outbase, d)
    Q.put(d)

# this loop creates the multiple processes
procs = []
for pid in range(num_p):
    try:
        p = Process(target=run, args=(Q, pid+1))
        p.daemon = True
        procs.append(p) 
        p.start()
    except KeyboardInterrupt:
        print "Aborting start of child processes"
        for x in procs:
            x.terminate()
        exit("Interrupted")     

try:
    for p in procs:
        p.join()
except:
    print "Terminating child processes"
    for x in procs:
        x.terminate()
    exit("Interrupted")

print "Parsing output..."
for d in genedirs: extractGff(d)

现在的问题是,abfgp.py 使用了 os.chdir 函数,这似乎破坏了并行处理。我收到很多错误,指出无法找到某些(输入/输出)文件/目录进行读/写。尽管我通过 os.system() 调用脚本,但我认为生成单独的进程会阻止这种情况。

如何解决这些 chdir 干扰?

编辑:我可能会使用正确的目录将 os.system() 更改为 subprocess.Popen(cwd="...") 。我希望这会有所作为。

谢谢。

4

2 回答 2

1

编辑 2

不要使用os.system()使用subprocess.call()

system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))

将转化为

subprocess.call((abfgp_cmd, '--dna', osjoin(inbase, id_, id_ +".dna.fa"), '--multifasta', osjoin(inbase, id_, "informants.mfa"), '--target', id_, '-o', out, '-q')) # without log.

编辑 1 我认为问题在于多处理使用模块名称来序列化函数、类。

这意味着,如果您import module在模块所在的位置执行操作,./module.py并且您os.chdir('./dir')现在需要执行类似的操作from .. import module

子进程继承父进程的文件夹。这可能是个问题。

解决方案

  1. 确保导入所有模块(在子进程中),然后更改目录
  2. 插入原始文件os.getcwd()sys.path启用从原始目录导入。这必须在从本地目录调用任何函数之前完成。
  3. 将您使用的所有函数放在一个始终可以导入的目录中。site-packages可能是这样一个目录。然后你可以做一些事情,比如import module module.main()开始你所做的事情。
  4. 这是我做的一个黑客,因为我知道泡菜是如何工作的。只有在其他尝试失败时才使用它。脚本打印:

    serialized # the function runD is serialized
    string executed # before the function is loaded the code is executed
    loaded # now the function run is deserialized
    run # run is called
    

    在你的情况下,你会做这样的事情:

    runD = evalBeforeDeserialize('__import__("sys").path.append({})'.format(repr(os.getcwd())), run)
    p = Process(target=runD, args=(Q, pid+1))
    

    这是脚本:

    # functions that you need
    
    class R(object):
        def __init__(self, call, *args):
    
            self.ret = (call, args)
        def __reduce__(self):
            return self.ret
        def __call__(self, *args, **kw):
            raise NotImplementedError('this should never be called')
    
    class evalBeforeDeserialize(object):
        def __init__(self, string, function):
            self.function = function
            self.string = string
        def __reduce__(self):
            return R(getattr, tuple, '__getitem__'), \
                     ((R(eval, self.string), self.function), -1)
    
    # code to show how it works        
    
    def printing():
        print('string executed')
    
    def run():
        print('run')
    
    runD = evalBeforeDeserialize('__import__("__main__").printing()', run)
    
    import pickle
    
    s = pickle.dumps(runD)
    print('serialized')
    run2 = pickle.loads(s)
    print('loaded')
    run2()
    

如果这些不起作用,请报告。

于 2014-02-26T10:33:17.833 回答
0

您可以确定os不可更改的程序正在使用哪个库实例;然后在该库中创建一个定制的版本来chdir满足您的需要——防止目录更改、记录它等等。如果定制的行为需要仅针对单个程序,您可以使用该inspect模块来识别调用者并以特定方式为该调用者定制行为。

如果您确实无法更改现有程序,则您的选择是有限的;但是,如果您可以选择更改它导入的库,那么这样的事情可能是一种避免不良行为的侵入性最小的方法。

更改标准库时通常需要注意事项。

于 2014-02-26T15:08:43.260 回答