1

我在 python 中注意到池分配的这种行为。即使我在池中有 20 个进程,当我为 8 个进程执行 map_async 时,我并没有将所有进程都扔掉,而是只执行了 4 个。当这 4 个完成时,它再发送两个,然后当这两个完成时发送一个。

当我向它扔超过 20 个时,它会运行所有 20 个,直到它开始在队列中获得少于 20 个,当上述行为重复时。

我认为这是故意这样做的,但看起来很奇怪。我的目标是在请求进入后立即处理它们,显然这种行为不适合。

使用 python 2.6 和台球来支持 maxtasksperchild

有什么想法可以改进吗?

代码:

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10)

while True:
    lines = DbData.GetAll()
    if len(lines) > 0:
        print 'Starting to process: ', len(lines), ' urls'
        Res = mypool.map_async(RunChild, lines)
        Returns = Res.get(None)
        print 'Pool returns: ', idx, Returns
    else:
        time.sleep(0.5)
4

1 回答 1

3

我在 Python 中处理多处理的一种方法如下:

我有要使用函数的数据function()
首先我创建一个多处理子类:

import multiprocessing

class ProcessThread(multiprocessing.Process):
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue):
        self.id_t = id_t
        self.inputlist = inputqueue
        self.idqueue = idqueue
        self.function = function
        self.resultqueue = resultqueue

        multiprocessing.Process.__init__(self)

    def run(self):
        s = "process number: " + str(self.id_t) + " starting"
        print s
        result = []

        while self.inputqueue.qsize() > 0
            try:
                inp = self.inputqueue.get()
            except Exception:
                pass
            result = self.function(inp)
            while 1:
               try:
                   self.resultqueue.put([self.id,])
               except Exception:
                   pass
               else:
                   break
            self.idqueue.put(id)
            return

和主要功能:

inputqueue = multiprocessing.Queue()
resultqueue = multiprocessing.Queue()
idqueue = multiprocessing.Queue()

def function(data):
    print data # or what you want

for datum in data:
    inputqueue.put(datum)

for i in xrange(nbprocess):
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start()

最后得到结果:

results = []
while idqueue.qsize() < nbprocess:
    pass
while resultqueue.qsize() > 0:
    results.append(resultqueue.get())

通过这种方式,您可以完美地控制流程和其他内容的附加内容。仅当每个数据的计算速度非常慢(< 1,2 秒)时,使用多处理inputqueue才是一种有效的技术,因为不同进程同时访问队列(这就是我使用异常的原因)。如果您的函数计算速度非常快,请考虑在开始时仅将数据拆分一次,并在开始时为每个进程放置数据集块。

于 2012-04-27T16:43:12.610 回答