1

我想使用multiprocessing模块中的远程管理器功能在多台机器之间分配工作。我知道有 3rd 方模块,但我想尽可能地坚持核心。我知道对于台式机(单机),您可以使用multiprocessing.Pool该类来限制 CPU 的数量,但对远程管理器有几个问题。

我有以下远程管理器代码:

   from multiprocessing.managers import BaseManager
   import Queue
   queue = Queue.Queue()
   class QueueManager(BaseManager): pass
   QueueManager.register('get_queue', callable=lambda:queue)
   m = QueueManager(address=('', 50000), authkey='abracadabra')
   s = m.get_server()
   s.serve_forever()

这很好用,我什至可以使用以下代码将作业提交到队列中:

QueueManager.register('get_queue')
m = QueueManager(address=('machinename', 50000), authkey='abracadabra')
m.connect()
queue = m.get_queue()
queue.put('hello')

您还可以queue.get()在队列中获取单个条目。

  1. 你如何获得队列中的项目?当我尝试遍历队列时,我进入了一个无限循环。
  2. 在工人方面,您可以将每台机器限制为每台机器 1 个工作吗?
  3. 既然这个方法好像是pull方法,worker需要检查job是否存在,有没有push方法可以触发multiprocessing server?
4

1 回答 1

1

遍历队列与执行以下操作相同:

while True:
    elem = queue.get()  #queue empty -> it blocks!!!

当没有更多要执行的作业时,“迭代”队列并阻止您的工作进程的一种优雅方法是使用None(或其他)作为哨兵并使用iter(callable, sentinel)

for job in iter(queue.get, None):
    # execute the calculation
    output_queue.put(result)

#shutdown the worker process

这相当于:

while True:
    job = queue.get()
    if job is None:
        break
    #execute the calculation
    output_queue.put(result)
#shutdown the worker process

请注意,您必须在队列中为每个工作子进程插入一个哨兵,否则会有子进程在等待它。

关于你的第二个问题,我不明白你在问什么。它BaseManager提供了一台服务器来执行来自客户端的调用,因此很明显,所有请求都由同一台机器满足。或者你的意思是允许每个客户只做一个请求?我没有看到任何选项,即使它可以“手动”实现。

我不明白你的问题。什么是拉式方法?您能否用“可以触发多处理服务器的推送方法”的意思更详细地说明您的问题?

于 2013-07-08T17:10:32.610 回答