0

我有以下方法:

def GetMarketData(fr, cr, dct1, dct2, dict3, dct5, dct5):
    md = MarketData()

    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    q4 = Queue()
    q5 = Queue()
    q6 = Queue()
    q7 = Queue()
    p1 = Process(target=md.GetMD1, args=(q1,))
    p2 = Process(target=md.GetMD2, args=(q2,))
    p3 = Process(target=md.GetMD3, args=(q3,))
    p4 = Process(target=md.GetMD4, args=(q4,))
    p5 = Process(target=md.GetMD5, args=(q5,))
    p6 = Process(target=md.GetMD6, args=(q6,))
    p7 = Process(target=md.GetMD7, args=(q7,))
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()
    p6.start()
    p7.start()
    fr.append(q1.get())
    dct1.update(q2.get())
    dct2.update(q3.get())
    dct3.update(q4.get())
    cr.append(q5.get())
    dct4.update(q6.get())
    dct5.update(q7.get())
    p1.join()
    p2.join()
    p3.join()
    p4.join()
    p5.join()
    p6.join()
    p7.join()

    #print "good"

有没有一种好方法可以尽可能用几行重写它。如果我需要在每个队列上打开相同的进程,这不会是一个问题,但是我需要并行运行几个不同的程序并且看不到如何用更少的行重写它......

谢谢 !!!

4

2 回答 2

1

代替:

q1 = Queue()
q2 = Queue()
q3 = Queue()
q4 = Queue()
q5 = Queue()
q6 = Queue()
q7 = Queue()

你可以改为写:

queues = [Queue() for i in xrange(7)]

现在您的队列已queues[0]通过queues[6]。您可以对您的流程执行相同的操作:

processes = [Process(target=getattr("md.GetMD%d" % (i+1)), args=(queues[i],))
             for i in xrange(7)]

然后使用循环启动它们:

for process in processes:
    process.start()

稍后加入类似:

for process in processes:
    process.join()

剩下这一部分:

fr.append(q1.get())
dct1.update(q2.get())
dct2.update(q3.get())
dct3.update(q4.get())
cr.append(q5.get())
dct4.update(q6.get())
dct5.update(q7.get())

只需在函数中接受任意数量的参数而不是命名参数,就可以以类似的方式循环;然后我们可以遍历参数:

for queue, result_dict in zip(queues, result_dicts):
    result_dict.update(queue.get())

然后我们也可以这样做,所以我们传入任意数量的字典,而不是使用幻数,7我们只需创建与参数一样多的队列和进程。该函数不需要知道某些字典与其他字典不同,因为实际上它们的处理方式都是相同的。

把它放在一起,我们得到:

def GetMarketData(*result_dicts):

    queues = [Queue() for item in result_dicts]

    processes = [Process(target=getattr("md.GetMD%d" % (i+1)), args=(queues[i],))
                 for i in xrange(len(result_dicts))]

    for process in processes:
        process.start()

    for queue, result_dict in zip(queues, result_dicts):
        result_dict.update(queue.get())

    for process in processes:
        process.join()

(无关的说明:我没有做太多的事情multiprocessing,但你不想join在从队列中读取之前对进程进行处理,以确保队列中的结果是完整的吗?)

于 2013-05-06T16:10:55.847 回答
1

作为第一个简化,我将使用列表:

def GetMarketData(fr, cr, dct1, dct2, dict3, dct5, dct5):
    md = MarketData()

    ques = [Queue() for _ in range(7)]
    procs = [Process(target=getattr(md,'GetMD%d'%(i),args=(q,)) 
             for i,q in enumerate(ques,1)]
    for p in procs:
        p.start()

    fr.append(ques[0].get())
    dct1.update(ques[1].get())
    dct2.update(ques[2].get())
    dct3.update(ques[3].get())
    cr.append(ques[4].get())
    dct4.update(ques[5].get())
    dct5.update(ques[6].get())

    for p in procs:
        p.join()
于 2013-05-06T16:03:26.307 回答