我将就如何解决这个问题提出我的看法。在multiprocessing
模块中Pipe
,Queue
IPC 机制确实是最好的方法;尽管您提到了增加的复杂性,但值得学习它们的工作原理。这Pipe
是相当简单的,所以我将用它来说明。
这是代码,然后是一些解释:
import sys
import os
import random
import time
import multiprocessing
class computing_task(multiprocessing.Process):
def __init__(self, name, pipe):
# call this before anything else
multiprocessing.Process.__init__(self)
# then any other initialization
self.name = name
self.ipcPipe = pipe
self.number1 = 0.0
self.number2 = 0.0
sys.stdout.write('[%s] created: %f\n' % (self.name, self.number1))
# Do some kind of computation
def someComputation(self):
try:
count = 0
while True:
count += 1
self.number1 = (random.uniform(0.0, 10.0)) * self.number2
sys.stdout.write('[%s]\t%d \t%g \t%g\n' % (self.name, count, self.number1, self.number2))
# Send result via pipe to parent process.
# Can send lists, whatever - anything picklable.
self.ipcPipe.send([self.name, self.number1])
# Get new data from parent process
newData = self.ipcPipe.recv()
self.number2 = newData[0]
time.sleep(0.5)
except KeyboardInterrupt:
return
def run(self):
sys.stdout.write('[%s] started ... process id: %s\n'
% (self.name, os.getpid()))
self.someComputation()
# When done, send final update to parent process and close pipe.
self.ipcPipe.send([self.name, self.number1])
self.ipcPipe.close()
sys.stdout.write('[%s] task completed: %f\n' % (self.name, self.number1))
def main():
# Create pipe
parent_conn, child_conn = multiprocessing.Pipe()
# Instantiate an object which contains the computation
# (give "child process pipe" to the object so it can phone home :) )
computeTask = computing_task('foo', child_conn)
# Start process
computeTask.start()
# Continually send and receive updates to/from the child process
try:
while True:
# receive data from child process
result = parent_conn.recv()
print "recv: ", result
# send new data to child process
parent_conn.send([random.uniform(0.0, 1.0)])
except KeyboardInterrupt:
computeTask.join()
parent_conn.close()
print "joined, exiting"
if (__name__ == "__main__"):
main()
我已经将要完成的计算封装在一个派生自Process
. 在大多数情况下,这并不是绝对必要的,但可以使代码更易于理解和扩展。在主进程中,您可以使用start()
此类实例上的方法启动计算任务(这将启动一个单独的进程来运行对象的内容)。
如您所见,我们Pipe
在父进程中使用创建两个连接器(管道的“末端”)并将一个给子进程,而父进程持有另一个。这些连接器中的每一个都是持有端的进程之间的双向通信机制,send()
以及recv()
用于执行其名称所暗示的方法。在本例中,我使用管道来传输数字和文本列表,但通常您可以发送列表、元组、对象或任何可腌制的东西(即可以使用 Python 的腌制工具进行序列化)。因此,您对在进程之间来回发送的内容有一定的自由度。
所以你设置你的连接器,调用start()
你的新进程,然后你就可以开始计算了。在这里,我们只是将两个数字相乘,但您可以看到它是在子流程中“交互地”完成的,并从父流程发送了更新。同样,父进程也会定期收到来自计算进程的新结果的通知。
请注意,连接器的recv()
方法是阻塞的,即如果另一端还没有发送任何东西,recv()
将等待直到有东西可以读取,并在此期间阻止其他任何事情发生。所以请注意这一点。
希望这可以帮助。同样,这是一个准系统示例,在现实生活中,您将希望进行更多错误处理,可能用于poll()
连接对象等等,但希望这能传达主要想法并帮助您入门。