我正在一起做几个过程。每个过程都会返回一些结果。我将如何从这些过程中收集这些结果。
task_1 = Process(target=do_this_task,args=(para_1,para_2))
task_2 = Process(target=do_this_task,args=(para_1,para_2))
do_this_task
返回一些结果。我想收集这些结果并将它们保存在一些变量中。
我正在一起做几个过程。每个过程都会返回一些结果。我将如何从这些过程中收集这些结果。
task_1 = Process(target=do_this_task,args=(para_1,para_2))
task_2 = Process(target=do_this_task,args=(para_1,para_2))
do_this_task
返回一些结果。我想收集这些结果并将它们保存在一些变量中。
所以现在我建议你应该使用 pythonmultiprocessing
模块的 Pool 因为它为你处理了很多。您能否详细说明您在做什么以及为什么要直接使用我假设的内容multiprocessing.Process
?
如果您仍想multiprocessing.Process
直接使用,则应使用 Queue 来获取返回值。
文档中给出的示例:
"
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
"-多处理文档
因此,进程是通常在后台运行以执行一般操作的事物,如果您对它们进行多处理,则需要“丢弃”数据,因为进程没有像线程那样的共享内存——这就是你使用队列的原因——它为你做。您可以做的另一件事是管道,并且方便地它们也为此提供了一个示例:)。"
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
" -多处理文档
在这种情况下,这样做是手动使用管道将完成的结果传递给“父进程”。
此外,有时我会发现multiprocessing
不能很好地腌制的情况,所以我使用这个很好的答案(或我修改过的专业变体)mrule
,他在这里发布:
"
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p,c) in pipe]
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
"
但是应该警告您,这会手动控制进程,因此某些事情可能会留下“死”进程 - 这不是一件好事,一个例子是意外信号 - 这是一个使用管道进行多处理的例子尽管 :)。
如果这些命令不在 python 中,例如你想运行,ls
那么使用 可能会更好subprocess
,因为os.system
它不再是一件好事,因为它现在被认为subprocess
是一个更容易使用和更灵活的工具,这里提出一个小讨论。
你可以用多处理做这样的事情
from multiprocessing import Pool
mydict = {}
with Pool(processes=5) as pool:
task_1 = pool.apply_async(do_this_task,args=(para_1,para_2))
task_2 = pool.apply_async(do_this_task,args=(para_1,para_2))
mydict.update({"task_1": task_1.get(), "task_2":task_2.get()})
print(mydict)
或者,如果您想尝试使用concurrent.futures进行多线程处理,请查看此答案。
如果进程是外部脚本,则尝试使用subprocess模块。但是,您的代码建议您要并行运行函数。为此,请尝试使用多处理模块。此答案中的一些代码用于使用多处理的具体细节:
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the other processes
return_val = async_result.get() # get the return value from your function.