2

我有一组命令行工具,我想在一系列文件上并行运行。我编写了一个 python 函数来包装它们,看起来像这样:

def process_file(fn):
    print os.getpid()
    cmd1 = "echo "+fn
    p = subprocess.Popen(shlex.split(cmd1))

    # after cmd1 finishes
    other_python_function_to_do_something_to_file(fn)

    cmd2 = "echo "+fn
    p = subprocess.Popen(shlex.split(cmd2))
    print "finish"

if __name__=="__main__":
    import multiprocessing
    p = multiprocessing.Pool()
    for fn in files:
        RETURN = p.apply_async(process_file,args=(fn,),kwds={some_kwds})

虽然这可行,但它似乎并没有运行多个进程;似乎它只是串行运行(我尝试过使用Pool(5)相同的结果)。我错过了什么?是否呼吁Popen“阻止”?

编辑:澄清一点。我需要 cmd1,然后是一些 python 命令,然后是 cmd2,以便在每个文件上按顺序执行。

EDIT2:上面的输出具有以下模式:

pid
finish
pid
finish
pid
finish

而类似的调用,使用map代替apply(但没有任何传递的规定kwds)看起来更像

pid
pid
pid
finish
finish
finish

但是,地图调用有时(总是?)在显然成功后挂起

4

1 回答 1

3

是否呼吁Popen“阻止”?

不。只需立即创建一个subprocess.Popen退货,给您一个可以等待或以其他方式使用的对象。如果你想阻止,这很简单:

subprocess.check_call(shlex.split(cmd1))

同时,我不确定你为什么将你的 args 放在一个字符串中,然后尝试将shlex它们返回到一个列表中。为什么不直接写清单?

cmd1 = ["echo", fn]
subprocess.check_call(cmd1)

虽然这可行,但它似乎并没有运行多个进程;似乎它只是串行运行

是什么让你这么想?鉴于每个进程只是尽可能快地将两个进程启动到后台,因此很难判断它们是否并行运行。

如果您想验证您是否从多个处理中获得工作,您可能需要添加一些打印或日志记录(并os.getpid()在消息中抛出类似的内容)。

同时,您似乎正在尝试完全复制multiprocessing.Pool.map_asyncout of a loop around的效果multiprocessing.Pool.apply_async,除了不是累积结果,而是将每个结果存储在一个名为的变量中RESULT,然后在使用它之前将其丢弃。为什么不直接使用map_async

最后,您询问是否multiprocessing是适合这项工作的工具。好吧,你显然需要一些异步的东西:check_call(args(file1))必须 block other_python_function_to_do_something_to_file(file1),但同时不能 block check_call(args(file2))

我可能会使用threading,但实际上,它并没有太大区别。即使您在一个进程启动成本高昂的平台上,您也已经支付了这笔费用,因为整个过程都在运行 N * M 组子进程,因此另一个 8 个池不会有任何损害。并且通过在线程之间共享数据而意外创建竞赛的风险很小,或者意外创建看起来像是在进程之间共享数据但实际上没有共享数据的代码,因为没有什么可共享的。所以,不管你更喜欢哪一个,去吧。

另一种选择是编写一个事件循环。我实际上可能会开始为这个问题自己做,但我会后悔的,你不应该这样做......</p>

于 2012-11-07T23:15:14.163 回答