要求是启动五个线程,并且只在最快的线程中等待。五个线程都去寻找相同的数据5个方向,一个就足够继续控制流了。
实际上,我需要等待前两个线程返回,以相互验证。但我想如果我知道如何等待最快。我可以弄清楚如何等待第二快。
谈了很多join(timeout)
,但你事先不知道要等哪一个(join
提前申请哪一个)。
要求是启动五个线程,并且只在最快的线程中等待。五个线程都去寻找相同的数据5个方向,一个就足够继续控制流了。
实际上,我需要等待前两个线程返回,以相互验证。但我想如果我知道如何等待最快。我可以弄清楚如何等待第二快。
谈了很多join(timeout)
,但你事先不知道要等哪一个(join
提前申请哪一个)。
使用队列:每个线程在完成后将结果放入队列中,然后您只需要读取适当数量的结果并忽略其余的:
#!python3.3
import queue # For Python 2.x use 'import Queue as queue'
import threading, time, random
def func(id, result_queue):
print("Thread", id)
time.sleep(random.random() * 5)
result_queue.put((id, 'done'))
def main():
q = queue.Queue()
threads = [ threading.Thread(target=func, args=(i, q)) for i in range(5) ]
for th in threads:
th.daemon = True
th.start()
result1 = q.get()
result2 = q.get()
print("Second result: {}".format(result2))
if __name__=='__main__':
main()
文档Queue.get()
(没有参数,它相当于Queue.get(True, None)
:
Queue.get([block[, timeout]])
从队列中移除并返回一个项目。如果可选的 args 块为真并且超时为无(默认值),则在必要时阻止,直到项目可用。如果 timeout 是一个正数,它最多会阻塞 timeout 秒,如果在那段时间内没有可用的项目,则会引发 Empty 异常。否则(块为假),如果一个项目立即可用,则返回一个项目,否则引发空异常(在这种情况下忽略超时)。
如果您的线程中有某种处理循环,则以下代码将在使用threading.Event()终止时终止它们:
def my_thread(stop_event):
while not stop_event.is_set():
# do stuff in a loop
# some check if stuff is complete
if stuff_complete:
stop_event.set()
break
def run_threads():
# create a thread event
a_stop_event = threading.Event()
# spawn the threads
for x in range(5):
t = threading.Thread(target=my_thread, args=[a_stop_event])
t.start()
while not a_stop_event.is_set():
# wait for an event
time.sleep(0.1)
print "At least one thread is done"
如果您的进程是“便宜的”或单个请求响应类型的线程(即例如异步 HTTP 请求),那么Duncan 的回答是一个好方法。
邓肯的方法可能是最好的,也是我推荐的。不过,之前没有“等待下一个完成的线程完成”让我有点恼火,所以我只是写了这个来尝试一下。似乎工作。只需使用MWThread
代替即可threading.thread
获得这个新wait_for_thread
功能。
全局变量有点笨拙;另一种方法是使它们成为类级变量。但是,如果这隐藏在模块中(mwthread.py 或其他),无论哪种方式都应该没问题。
#! /usr/bin/env python
# Example of how to "wait for" / join whichever threads is/are done,
# in (more or less) the order they're done.
import threading
from collections import deque
_monitored_threads = []
_exited_threads = deque()
_lock = threading.Lock()
_cond = threading.Condition(_lock)
class MWThread(threading.Thread):
"""
multi-wait-able thread, or monitored-wait-able thread
"""
def run(self):
tid = threading.current_thread()
try:
with _lock:
_monitored_threads.append(tid)
super(MWThread, self).run()
finally:
with _lock:
_monitored_threads.remove(tid)
_exited_threads.append(tid)
_cond.notifyAll()
def wait_for_thread(timeout=None):
"""
Wait for some thread(s) to have finished, with optional
timeout. Return the first finished thread instance (which
is removed from the finished-threads queue).
If there are no unfinished threads this returns None
without waiting.
"""
with _cond:
if not _exited_threads and _monitored_threads:
_cond.wait(timeout)
if _exited_threads:
result = _exited_threads.popleft()
else:
result = None
return result
def main():
print 'testing this stuff'
def func(i):
import time, random
sleeptime = (random.random() * 2) + 1
print 'thread', i, 'starting - sleep for', sleeptime
time.sleep(sleeptime)
print 'thread', i, 'finished'
threads = [MWThread(target=func, args=(i,)) for i in range(3)]
for th in threads:
th.start()
i = 0
while i < 3:
print 'main: wait up to .5 sec'
th = wait_for_thread(.5)
if th:
print 'main: got', th
th.join()
i += 1
else:
print 'main: timeout'
print 'I think I collected them all'
print 'result of wait_for_thread():'
print wait_for_thread()
if __name__ == '__main__':
main()
或者只是跟踪列表中所有已完成的线程并让第二个线程完成处理应该完成的任何事情,Python 列表是线程安全的。
finished_threads = []
event = threading.Event()
def func():
do_important_stuff()
thisthread = threading.current_thread()
finished_threads.append(thisthread)
if len(finished_threads) > 1 and finished_threads[1] == thisthread:
#yay we are number two!
event.set()
for i in range(5):
threading.Thread(target=func).start()
event.wait()
您可以为此使用事件。请参阅http://docs.python.org/2/library/threading.html#event-objects 这个想法是工作线程在完成时引发一个事件。主线程在继续之前等待此事件。工作线程可以设置一个(互斥的)变量来标识自己与事件。