5

这不是很重要,只是一个愚蠢的实验。我想创建自己的消息传递。我想要一个队列字典,其中每个键都是进程的 PID。因为我想让进程(由 Process() 创建)交换消息,将它们插入他们想要发送到的进程的队列中(知道它的 pid)。这是一个愚蠢的代码:

from multiprocessing import Process, Manager, Queue
from os import getpid
from time import sleep

def begin(dic, manager, parentQ):
    parentQ.put(getpid())
    dic[getpid()] = manager.Queue()
    dic[getpid()].put("Something...")

if __name__== '__main__':
    manager = Manager()
    dic = manager.dict()
    parentQ = Queue()

    p = Process(target = begin, args=(dic, manager, parentQ))
    p.start()
    son = parentQ.get()
    print son
    sleep(2)
    print dic[son].get()

dic[getpid()] = manager.Queue(),这工作正常。但是当我执行时, dic[son].put()/get()我收到以下消息:

Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "mps.py", line 8, in begin
    dic[getpid()].put("Something...")
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 773, in _callmethod
    raise convert_to_error(kind, result)
RemoteError: 
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <Queue.Queue instance at 0x8a92d0c>)
---------------------------------------------------------------------------

你知道什么是正确的方法吗?

4

1 回答 1

1

我相信您的代码失败了,因为队列不可序列化,就像回溯说的那样。multiprocessing.Manager() 对象可以毫无问题地为您创建共享字典,就像您在此处所做的那样,但是存储在字典中的值仍然需要可序列化(或在 Pythonese 中可提取)。如果您对无法访问彼此队列的子流程感到满意,那么这应该适合您:

from multiprocessing import Process, Manager, Queue
from os import getpid

number_of_subprocesses_i_want = 5

def begin(myQ):
    myQ.put("Something sentimental from your friend, PID {0}".format(getpid()))
    return

if __name__== '__main__':
    queue_dic = {}
    queue_manager = Manager()

    process_list = []

    for i in xrange(number_of_subprocesses_i_want):
        child_queue = queue_manager.Queue()

        p = Process(target = begin, args=(child_queue,))
        p.start()
        queue_dic[p.pid] = child_queue
        process_list.append(p)

    for p in process_list:
        print(queue_dic[p.pid].get())
        p.join()

这为您留下了一个字典,其键是子进程,值是它们各自的队列,可以从主进程中使用。

我认为您的原始目标无法通过队列实现,因为您希望子进程使用的队列必须在创建时传递给进程,因此当您启动更多进程时,您无法授予现有进程访问权限一个新的队列。

进行进程间通信的一种可能方法是让每个人共享一个队列,以将消息传递回与某种标头捆绑的主进程,例如在元组中:

(destination_pid, sender_pid, message)

..并主要读取destination_pid并将(sender_pid,message)定向到该子进程的队列。当然,这意味着您需要一种在有新进程可与之通信时通知现有进程的方法。

于 2014-07-14T20:27:56.210 回答