2

我试图弄清楚如何在 python 中编写一个使用多处理队列的程序。

我有多台服务器,其中一台将远程提供队列:

from multiprocessing.managers import BaseManager
import Queue
import daemonme

queue = Queue.Queue()

class QueueManager(BaseManager):
    pass

daemonme.createDaemon()
QueueManager.register('get_job', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey='')
s = m.get_server()
s.serve_forever()

现在我想使用我的双 Xeon 四核服务器来处理来自这个远程队列的作业。这些工作完全相互独立。因此,如果我有 8 个内核,我想启动 7 个进程,从队列中挑选一个作业,处理它,然后返回下一个。7 个进程中的每一个都会执行此操作,但我无法完全理解该程序的结构。

谁能给我一些关于这个基本结构的有根据的想法?

先感谢您。

4

2 回答 2

2

查看文档如何从管理器(第 17.6.2.7 段)中检索队列,而不是使用工作人员(第 17.6.2.9 段)启动 7 个作业,将队列传递给每个作业。

或者,您可以考虑类似生产者/消费者问题:

from multiprocessing.managers import BaseManager
import random

class Producer():
def __init__(self):
    BaseManager.register('queue')
    self.m = BaseManager(address=('hostname', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.cm_queue = self.m.queue()
    while 1:
        time.sleep(random.randint(1,3))
        self.cm_queue.put(<PUT-HERE-JOBS>)

from multiprocessing.managers import BaseManager
import time
import random
class Consumer():
def __init__(self):
    BaseManager.register('queue')

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.queue = self.m.queue()
    while 1:
        <EXECUTE(job = self.queue.get())>


from multiprocessing.managers import BaseManager, Queue
class Manager():

def __init__(self):

    self.queue = QueueQueu()

    BaseManager.register('st_queue', callable=lambda:self.queue)

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.s = self.m.get_server()

    self.s.serve_forever()
于 2009-08-24T15:55:54.980 回答
0

您应该使用主从(又名农民-工人)模式。初始进程将是主进程并创建作业。它

  1. 创建一个队列
  2. 创建7个从进程,将队列作为参数传递
  3. 开始将作业写入队列

从属进程不断地从队列中读取数据,并执行作业(可能直到它们从队列中接收到停止消息)。在这种情况下,AFAICT 不需要使用 Manager 对象。

于 2009-08-24T15:53:11.753 回答