0

我对下面的代码有一个非常奇怪的问题。当numrows = 10Process 循环完成自身并继续完成时。如果不断增长的列表变得更大,它就会陷入僵局。为什么会这样,我该如何解决?

import multiprocessing, time, sys

# ----------------- Calculation Engine -------------------
def feed(queue, parlist):
    for par in parlist:
        queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "Project ID: %s started. " % par
            res = doCalculation(par)
            queueOut.put(res)

        except:
            break

def write(queue, fname):
    print 'Started to write to file'
    fhandle = open(fname, "w")
    while True:
        try:
            res = queue.get(block = False)
            for m in res:
                print >>fhandle, m
        except:
            break
    fhandle.close()
    print 'Complete writing to the file'


def doCalculation(project_ID):
    numrows = 100
    toFileRowList = []

    for i in range(numrows):
        toFileRowList.append([project_ID]*100)
        print "%s %s" % (multiprocessing.current_process().name, i)

    return toFileRowList


def main():
    parlist     = [276, 266]

    nthreads    = multiprocessing.cpu_count()
    workerQueue = multiprocessing.Queue()
    writerQueue = multiprocessing.Queue()

    feedProc = multiprocessing.Process(target = feed , args = (workerQueue, parlist))
    calcProc = [multiprocessing.Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = multiprocessing.Process(target = write, args = (writerQueue, 'somefile.csv'))

    feedProc.start()
    feedProc.join ()

    for p in calcProc:
        p.start()
    for p in calcProc:
        p.join()

    writProc.start()
    writProc.join()

if __name__=='__main__':
    sys.exit(main())
4

2 回答 2

1

我认为问题是队列缓冲区被填满,因此您需要先从队列中读取,然后才能将其他内容放入其中。例如,在您的feed线程中,您有:

queue.put(par)

如果你在不阅读的情况下继续放置很多东西,这将导致它阻塞,直到释放缓冲区,但问题是你只释放线程中的缓冲区,而在你加入阻塞线程calc之前它不会开始。feed

因此,为了让您的feed线程完成,应该释放缓冲区,但在线程完成之前不会释放缓冲区:)

尝试组织您的队列访问更多。

于 2012-06-07T14:57:36.460 回答
1

feedProc 和 writeProc 实际上并没有与程序的其余部分并行运行。当你有

proc.start()
proc.join ()

您开始该过程,然后join()立即等待它完成。在这种情况下,多处理没有任何好处,只有开销。尝试在加入之前立即启动所有进程。这也将产生您的队列被清空的效果,并且您不会死锁。

于 2012-06-07T15:15:29.503 回答