我在需要超时的线程中使用条件变量。直到看到运行大量线程时的 CPU 使用率,我才注意到线程模块中提供的条件变量实际上并没有休眠,而是在提供超时作为参数时进行轮询。
是否有替代方案实际上像 pthreads 一样睡觉?
让很多线程以多秒的间隔休眠似乎很痛苦,但它仍然在消耗 CPU 时间。
谢谢!
我在需要超时的线程中使用条件变量。直到看到运行大量线程时的 CPU 使用率,我才注意到线程模块中提供的条件变量实际上并没有休眠,而是在提供超时作为参数时进行轮询。
是否有替代方案实际上像 pthreads 一样睡觉?
让很多线程以多秒的间隔休眠似乎很痛苦,但它仍然在消耗 CPU 时间。
谢谢!
这在 Python 中似乎很棘手,但这是一个解决方案。它依赖于产生额外的线程,但不使用轮询,并确保在超时到期或原始 wait() 返回时立即唤醒原始线程。
注意:以下代码包含一个测试用例,它测试由于超时和通知而导致的条件等待结束。
from thread import start_new_thread
from threading import Condition, Timer
class ConditionWithoutPolling():
"""Implements wait() with a timeout without polling. Wraps the Condition
class."""
def __init__(self, condition):
self.condition = condition
self.wait_timeout_condition = Condition()
def wait(self, timeout=None):
"""Same as Condition.wait() but it does not use a poll-and-sleep method
to implement timeouts. Instead, if a timeout is requested two new
threads are spawned to implement a non-pol-and-wait method."""
if timeout is None:
# just use the original implementation if no waiting is involved
self.condition.wait()
return
else:
# this new boolean will tell us whether we are done waiting or not
done = [False]
# wait on the original condition in a new thread
start_new_thread(self.wait_on_original, (done,))
# wait for a timeout (without polling) in a new thread
Timer(timeout, lambda : self.wait_timed_out(done)).start()
# wait for EITHER of the previous threads to stop waiting
with self.wait_timeout_condition:
while not done[0]:
self.wait_timeout_condition.wait()
def wait_on_original(self, done):
"""Waits on the original Condition and signals wait_is_over when done."""
self.condition.wait()
self.wait_is_over(done)
def wait_timed_out(self, done):
"""Called when the timeout time is reached."""
# we must re-acquire the lock we were waiting on before we can return
self.condition.acquire()
self.wait_is_over(done)
def wait_is_over(self, done):
"""Modifies done to indicate that the wait is over."""
done[0] = True
with self.wait_timeout_condition:
self.wait_timeout_condition.notify()
# wrap Condition methods since it wouldn't let us subclass it ...
def acquire(self, *args):
self.condition.acquire(*args)
def release(self):
self.condition.release()
def notify(self):
self.condition.notify()
def notify_all(self):
self.condition.notify_all()
def notifyAll(self):
self.condition.notifyAll()
def test(wait_timeout, wait_sec_before_notification):
import time
from threading import Lock
lock = Lock()
cwp = ConditionWithoutPolling(Condition(lock))
start = time.time()
def t1():
with lock:
print 't1 has the lock, will wait up to %f sec' % (wait_timeout,)
cwp.wait(wait_timeout)
time_elapsed = time.time() - start
print 't1: alive after %f sec' % (time_elapsed,)
# this thread will acquire the lock and then conditionally wait for up to
# timeout seconds and then print a message
start_new_thread(t1, ())
# wait until it is time to send the notification and then send it
print 'main thread sleeping (will notify in %f sec)' % (wait_sec_before_notification,)
time.sleep(wait_sec_before_notification)
with lock:
cwp.notifyAll()
print 'notification sent, will continue in 2sec'
time.sleep(2.0) # give the other time thread to finish before exiting
if __name__ == "__main__":
print 'test wait() ending before the timeout ...'
test(2.0, 1.0)
print '\ntest wait() ending due to the timeout ...'
test(2.0, 4.0)
我不熟悉 Python,但是如果您能够阻止条件变量(没有超时),您可以自己实现超时。让阻塞线程存储它开始阻塞的时间并设置一个定时器来发出信号。当它醒来时,检查超时时间。除非您可以将计时器聚合到单个线程,否则这不是一个很好的方法,否则您的线程数会无缘无故地翻倍。