2

我正在编写一个程序,其中可变数量的代理对象同时运行多个串行方法并将它们的返回值存储在队列属性中。每个 Agent 都有一个 Worker(Process 的子类)作为属性,并为其提供作业以通过 cmd_queue 串行运行。Agent 从 res_queue 中的 Worker 获取结果。这些当前是 Manager().Queue() 实例和原因: TypeError: Pickling an AuthenticationString object is disallowed for security reasons 但是,如果我使用常规 Queue.Queue,Worker 会获得 Agent 的 cmd_queue 的副本,并且看不到 Agent 添加到其中的内容(它总是空的)。

我可以使用此问题中引用的解决方案来腌制实例方法:Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

from multiprocessing import Manager, Process
from time import sleep
import copy_reg  

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method  

class Worker(Process):
    def __init__(self, cmd_queue, res_queue):
        self.cmd_queue = cmd_queue
        self.res_queue = res_queue
        Process.__init__(self)

    def run(self):
        while True:
            f, args, kwargs = self.cmd_queue.get()
            self.res_queue.put( f(*args, **kwargs) )  

class Agent:
    def __init__(self):
        self.cmd_queue = Manager().Queue()
        self.res_queue = Manager().Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def produce(self, f, *args, **kwargs):
        self.cmd_queue.put((f, args, kwargs))

    def do_some_work(self):
        self.produce(self.foo, waka='waka')

    def do_some_other_work(self):
        self.produce(self.bar, humana='humana')

    def foo(self, **kwargs):
        sleep(5)
        return('this is a foo')

    def bar(self, **kwargs):
        sleep(10)
        return('this is a bar')

    def get_results(self):  #blocking call
        res = []
        while not self.cmd_queue.empty():#wait for Worker to finish
            sleep(.5)
        while not self.res_queue.empty():
            res.append(self.res_queue.get())
        return res  

#This is the interface I'm looking for.
if __name__=='__main__':
    agents = [Agent() for i in range(50)]
    #this should flow quickly as the calls are added to cmd_queues
    for agent in agents:        
        agent.do_some_work()
        agent.do_some_other_work()  
    for agent in agents:
        print(agent.get_results())

我的问题是,我怎样才能让这个代码使用多处理来工作,或者有没有更好、更被接受的方法来让这个模式工作?这是一个更大框架的一小部分,所以我希望它尽可能地对 OO 友好。

编辑:这是在 python 2.7 中。

4

2 回答 2

2

您可以使用普通的multiprocessing.Queue. 您只需要调整类,使其在类本身被腌制时Agent不会尝试腌制Queue实例。Agent这是必需的,因为Agent当您腌制要发送到的实例方法时,您必须腌制实例本身Worker。不过,这样做很容易:

class Agent(object): # Agent is now a new-style class
    def __init__(self):
        self.cmd_queue = Queue()
        self.res_queue = Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def __getstate__(self):
        """ This is called to pickle the instance """
        self_dict = self.__dict__.copy()
        del self_dict['cmd_queue']
        del self_dict['res_queue']
        del self_dict['worker']
        return self_dict

    def __setstate__(self, self_dict):
        """ This is called to unpickle the instance. """
        self.__dict__ = self_dict

    ... # The rest is the same.

请注意,此代码中还有一些其他逻辑问题使其无法正常运行;get_results并没有真正做你期望它做的事情,因为这很容易受到竞争条件的影响:

    while not self.cmd_queue.empty():#wait for Worker to finish
        sleep(.5)
    while not self.res_queue.empty():
        res.append(self.res_queue.get())

cmd_queue可能(并且确实,使用您的示例代码)在您实际传递给它的函数在内部运行完成之前最终为空Worker,这意味着当您将所有内容从res_queue. 你可以通过使用 a 来解决这个问题JoinableQueue,它允许工作人员在他们完成时发出实际的信号。

您还应该向工作进程发送一个哨兵,以便它们正确关闭,以便它们的所有结果都能res_queue正确地从父进程中刷新并发送回父进程。我还发现我需要在 中添加一个哨兵res_queue,否则有时res_queue会在父级中显示为空,然后从子级写入的最后一个结果实际上被冲过管道,这意味着最后一个结果会丢失。

这是一个完整的工作示例:

from multiprocessing import Process, Queue, JoinableQueue
import types
from time import sleep
import copy_reg  

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Worker(Process):
    def __init__(self, cmd_queue, res_queue):
        self.cmd_queue = cmd_queue
        self.res_queue = res_queue
        Process.__init__(self)

    def run(self):
        for f, args, kwargs in iter(self.cmd_queue.get, 
                                    (None, (), {})): # None is our sentinel
            self.res_queue.put( f(*args, **kwargs) )  
            self.cmd_queue.task_done() # Mark the task as done.
        self.res_queue.put(None) # Send this to indicate no more results are coming
        self.cmd_queue.task_done() # Mark the task as done

class Agent(object):
    def __init__(self):
        self.cmd_queue = JoinableQueue()
        self.res_queue = Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['cmd_queue']
        del self_dict['res_queue']
        del self_dict['worker']
        return self_dict

    def __setstate__(self, self_dict):
        self.__dict__ = self_dict

    def produce(self, f, *args, **kwargs):
        self.cmd_queue.put((f, args, kwargs))

    def do_some_work(self):
        self.produce(self.foo, waka='waka')

    def do_some_other_work(self):
        self.produce(self.bar, humana='humana')

    def send_sentinel(self):
        self.produce(None)

    def foo(self, **kwargs):
        sleep(2)
        return('this is a foo')

    def bar(self, **kwargs):
        sleep(4)
        return('this is a bar')

    def get_results(self):  #blocking call
        res = []
        self.cmd_queue.join() # This will block until task_done has been called for every put pushed into the queue.
        for out in iter(self.res_queue.get, None):  # None is our sentinel
            res.append(out)
        return res  

#This is the interface I'm looking for.
if __name__=='__main__':
    agents = [Agent() for i in range(50)]
    #this should flow quickly as the calls are added to cmd_queues
    for agent in agents:        
        agent.do_some_work()
        agent.do_some_other_work()  
        agent.send_sentinel()
    for agent in agents:
        print(agent.get_results())

输出:

['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
于 2015-04-15T15:07:18.033 回答
1

你会满意一个非常温和的分叉multiprocessing可以使这种模式工作吗?如果是这样,您只需在问题中提到的链接中再往下看一点:Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

就像pathos.multiprocessing可以Pool以非常干净的方式腌制实例方法一样,您可以像在串行 python 中编码一样工作……而且它可以工作……甚至直接来自解释器。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> from Queue import Queue
>>> from time import sleep
>>> 
>>> class Agent:
...   def __init__(self):
...     self.pool = Pool()
...     self.queue = Queue()
...   def produce(self, f, *args, **kwds):
...     self.queue.put(self.pool.apipe(f, *args, **kwds))
...   def do_some_work(self):
...     self.produce(self.foo, waka='waka')
...   def do_some_other_work(self):
...     self.produce(self.bar, humana='humana')
...   def foo(self, **kwds):
...     sleep(5)
...     return 'this is a foo'
...   def bar(self, **kwds):
...     sleep(10) 
...     return 'this is a bar'
...   def get_results(self):
...     res = []
...     while not self.queue.empty():
...       res.append(self.queue.get().get())
...     return res
... 
>>> agents = [Agent() for i in range(50)]
>>> for agent in agents:
...   agent.do_some_work()
...   agent.do_some_other_work()
... 
>>> for agent in agents:
...   print(agent.get_results())
... 
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
>>> 

pathos到这里:https : //github.com/uqfoundation

于 2015-04-15T13:58:59.283 回答