我正在编写一个程序,其中可变数量的代理对象同时运行多个串行方法并将它们的返回值存储在队列属性中。每个 Agent 都有一个 Worker(Process 的子类)作为属性,并为其提供作业以通过 cmd_queue 串行运行。Agent 从 res_queue 中的 Worker 获取结果。这些当前是 Manager().Queue() 实例和原因:
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
但是,如果我使用常规 Queue.Queue,Worker 会获得 Agent 的 cmd_queue 的副本,并且看不到 Agent 添加到其中的内容(它总是空的)。
我可以使用此问题中引用的解决方案来腌制实例方法:Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()
from multiprocessing import Manager, Process
from time import sleep
import copy_reg
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method
class Worker(Process):
def __init__(self, cmd_queue, res_queue):
self.cmd_queue = cmd_queue
self.res_queue = res_queue
Process.__init__(self)
def run(self):
while True:
f, args, kwargs = self.cmd_queue.get()
self.res_queue.put( f(*args, **kwargs) )
class Agent:
def __init__(self):
self.cmd_queue = Manager().Queue()
self.res_queue = Manager().Queue()
self.worker = Worker(self.cmd_queue, self.res_queue)
self.worker.start()
def produce(self, f, *args, **kwargs):
self.cmd_queue.put((f, args, kwargs))
def do_some_work(self):
self.produce(self.foo, waka='waka')
def do_some_other_work(self):
self.produce(self.bar, humana='humana')
def foo(self, **kwargs):
sleep(5)
return('this is a foo')
def bar(self, **kwargs):
sleep(10)
return('this is a bar')
def get_results(self): #blocking call
res = []
while not self.cmd_queue.empty():#wait for Worker to finish
sleep(.5)
while not self.res_queue.empty():
res.append(self.res_queue.get())
return res
#This is the interface I'm looking for.
if __name__=='__main__':
agents = [Agent() for i in range(50)]
#this should flow quickly as the calls are added to cmd_queues
for agent in agents:
agent.do_some_work()
agent.do_some_other_work()
for agent in agents:
print(agent.get_results())
我的问题是,我怎样才能让这个代码使用多处理来工作,或者有没有更好、更被接受的方法来让这个模式工作?这是一个更大框架的一小部分,所以我希望它尽可能地对 OO 友好。
编辑:这是在 python 2.7 中。