3

所以我编写了一个工具,它获取项目列表,将其拆分为给定数量的列表(比如说 10 个),然后取出这 10 个列表并产生 10 个线程,“EvaluationThreads”(扩展 threading.thread),每个那些线程评估他们提供评估的任何内容。当我启动每个线程时,我将它们全部放入一个列表中,并在生成它们后,我有以下代码:

for th in threadList:
    th.join()
    someTotal = th.resultsAttribute

这就是我如何处理等待所有线程完成并收集它们的信息。虽然这是一种等待一切完成然后收集结果的工作方式,但我觉得必须有一种更优雅的方式来做这件事,因为这些线程可以很好地在不同的时间完成,如果第一个开始的线程最后完成所有较早完成的必须等待该线程完成才能加入。有没有办法获取这些线程的信息并在它们完成时加入它们,而不是按照它们开始的顺序?我最初认为我会在线程中使用某种回调或其他东西,但我不确定是否有更可接受的解决方案。

谢谢你的帮助。

编辑:澄清一下,我的评估函数不受 CPU 限制,我也不想在线程之间分配文档以尽快完成它,每个线程都有固定的大约偶数个作业。

4

2 回答 2

2

对于您的主要问题:

如果你正在做比这更复杂的事情——或者,特别是,如果你重复这样做——你可能需要一个“线程组”类。有几十个是预制的,但如果你不喜欢其中任何一个,自己写一个就很简单了。

然后,而不是这个:

threadList = []
for argchunk in splitIntoChunks(values, 10):
  threadList.append(threading.Thread(target=myThreadFunc, args=argchunk))
...
someTotal = 0
for th in threadList:
  th.join()
  someTotal += th.resultsAttribute

你可以这样做:

threadGroup = ThreadGroup.ThreadGroup()
for argchunk in splitIntoChunks(values, 10):
  threadGroup.newThread(myThreadFunc, argchunk)
threadGroup.join()
someTotal = sum(th.resultsAttribute for th in threadGroup)

或者,也许更好,一个完整的线程池库,所以你可以这样做:

pool = ThreadPool(10)
for argchunk in splitIntoChunks(values, 100):
  pool.putRequest(myThreadFunc, argchunk)
pool.wait()

这里的优点是您可以轻松地在 10 个线程上适当地安排 100 个作业,而不是每个线程一个 10 个作业,而无需维护队列等所有工作。缺点是您不能只迭代线程要获得返回值,您必须迭代作业——理想情况下,您不想让作业一直存活到最后,这样您就可以迭代它们。

这将我们带到您的第二个问题,即如何从线程(或作业)中获取值。有很多很多方法可以做到这一点。

你所做的工作。你甚至不需要任何锁定。

正如您所建议的,使用回调也可以。但请记住,回调将在工作线程上运行,而不是在主线程上,所以如果它正在访问某个全局​​对象,您将需要某种同步。

如果您无论如何都要同步,那么回调可能没有任何好处。例如,如果你想要做的只是对一堆值求和,你可以只 set total=[0],并让每个线程total[0] += myValue在锁内执行。(当然在这种情况下,只在主线程中进行求和并避免锁定可能更有意义,但如果合并结果的工作量更大,那么这个选择可能就不那么简单了。)

您还可以使用某种原子对象,而不是显式锁定。例如,标准的 Queue.Queue 和 collections.deque 都是原子的,所以每个线程可以只 set q = Queue.Queue(),然后每个线程通过 do 推送它的结果q.push(myValue),然后加入后你只需迭代并总结队列的值。

事实上,如果每个线程只向队列推送一次,您可以在队列本身上执行 10 次阻塞获取,之后您知道group.join()orpool.wait()或 what 将快速返回。

或者您甚至可以将回调作为作业推送到队列中。同样,您可以在队列上执行 10 次阻塞获取,每次都执行结果。

如果每个线程可以返回多个对象,他们可以在完成后将一个标记值或回调推送到队列中,并且您的主线程会不断弹出,直到它读取 10 个标记。

于 2012-07-20T01:32:08.747 回答
1

使用 Queue 将信息从线程中推送出去,只要它可用:

假设这是您的线程:

class myThread(threading.Thread):
   def __init__(self, results_queue):
       self.results_queue = results_queue
       #other init code here


   def run(self):
       #thread code here

       self.results_queue.put(result) #result is the information you want from the thread

这是您的主要代码:

import Queue #or "import queue" in Python 3.x
results_queue = Queue()

#thread init code here

for i in xrange(num_threads_running):
    data = results_queue.get() # queue.get() blocks until some item is available
    #process data as it is made available

#at this point, there is no need to .join(), since all the threads terminate as soon as they put data to the queue.
于 2012-07-19T23:47:20.907 回答