0

我刚开始尝试做多线程/多处理器的东西,遇到了一些问题。我想要做的是生成一些对应该从远程数据库下载的数据的请求。这些存储在 Queue.Queue 中(我们称之为 in_q)。一旦生成了所有请求,我就会启动有限数量的线程类,该线程类将 in_q 和另一个队列 (out_q) 作为输入。然后我从 q_in 获取()作业并将结果输出到 q_out。所以这部分是 IO-bound 因此我认为线程是一个不错的选择。来自 q_out 的结果被进程池消耗,这些进程池对进程执行一些工作。这部分受 CPU 限制,因此我认为进程将是一个不错的选择。

现在这似乎工作正常,除了我遇到了我在下面演示的奇怪行为。

import threading
import Queue
import multiprocessing as mp

class TestThread(threading.Thread):

    def __init__ ( self, threadnr,resultPool,jobPool ):
      self.threadnr = threadnr
      self.resultPool = resultPool
      self.jobPool = jobPool
      threading.Thread.__init__ ( self )    

    def run(self):
        while True:
            job = self.jobPool.get()
            if job != None:
                for a in range(10):
                    for i in xrange(1000000):
                        pass
                print "Thread nr %d finished job %d" % (self.threadnr,job)
                self.resultPool.put([self.threadnr,job+1])
                self.jobPool.task_done()           

def test(i):
    print mp.current_process().name,"test",i
    return mp.current_process().name,"test",i

if __name__ == '__main__':        
    q_in = Queue.Queue()   
    q_out = Queue.Queue() 
    nr_jobs = 20
    res = []
    nr_threads = 4
    threads = []

    for i in range(nr_jobs):
        q_in.put(i)

    for i in range(nr_threads):
        t = TestThread(i,q_out,q_in)
        t.start()
        threads.append(t)

    p_pool = mp.Pool(4)   

    for i in range(nr_jobs):
        job = q_out.get(block=True)
        print "Got job",job
        res.append(p_pool.apply_async(test,(job,)))

    p_pool.close()
    p_pool.join()

    for r in res:
        print r.get()

    for t in threads:
        t.join()

这个的输出是:

Thread nr 2 finished job 2
Got job [2, 3]
Thread nr 0 finished job 0
Got job [0, 1]
Thread nr 1 finished job 1
Got job [1, 2]
Thread nr 3 finished job 3
Got job [3, 4]
Thread nr 2 finished job 4
Got job Thread nr 0 finished job 5[
2, 5]
Got job [0, 6]
Thread nr 1 finished job 6
Got job [1, 7]
Thread nr 3 finished job 7
Got job [3, 8]
Thread nr 2 finished job 8
Got job [2, 9]
Thread nr 0 finished job 9
Got job [0, 10]
PoolWorker-4 test [1, 2]
PoolWorker-4 test [1, 7]
PoolWorker-3 test [3, 4]
PoolWorker-3 test [3, 8]
PoolWorker-2 test [0, 1]
PoolWorker-2 test [0, 6]
PoolWorker-2 test [0, 10]
PoolWorker-1 test [2, 3]
PoolWorker-1 test [2, 5]
PoolWorker-1 test [2, 9]
('PoolWorker-1', 'test', [2, 3])
('PoolWorker-2', 'test', [0, 1])
('PoolWorker-4', 'test', [1, 2])
('PoolWorker-3', 'test', [3, 4])
('PoolWorker-1', 'test', [2, 5])
('PoolWorker-2', 'test', [0, 6])
('PoolWorker-4', 'test', [1, 7])
('PoolWorker-3', 'test', [3, 8])
('PoolWorker-1', 'test', [2, 9])
('PoolWorker-2', 'test', [0, 10])

这是一个测试程序,在很大程度上就像我的真实程序一样工作。我觉得奇怪的是,即使线程需要相对较长的时间来完成进程,在线程全部完成工作之前不会打印出来。看起来工作是连续消耗的,但进程的输出直到所有线程都完成后才会出现。

在这个例子中它是相当无害的(如果烦人的话),但是在我的真实程序中这个......输出排队似乎会导致内存错误,因为进程的所有输出都被延迟到最后一个线程完成。

作为一个附加问题,混合线程和进程是否是一个好主意,或者我应该坚持一个或另一个?

我将不胜感激对此事的任何想法。

4

0 回答 0