如果我有两个threading.Event()
对象,并且希望在设置其中一个对象之前一直睡觉,那么在 python 中是否有一种有效的方法来做到这一点?显然我可以对轮询/超时做一些事情,但我希望真正让线程休眠,直到设置一个,类似于select
文件描述符的使用方式。
那么在下面的实现中,一个高效的非轮询实现wait_for_either
会是什么样子呢?
a = threading.Event()
b = threading.Event()
wait_for_either(a, b)
如果我有两个threading.Event()
对象,并且希望在设置其中一个对象之前一直睡觉,那么在 python 中是否有一种有效的方法来做到这一点?显然我可以对轮询/超时做一些事情,但我希望真正让线程休眠,直到设置一个,类似于select
文件描述符的使用方式。
那么在下面的实现中,一个高效的非轮询实现wait_for_either
会是什么样子呢?
a = threading.Event()
b = threading.Event()
wait_for_either(a, b)
这是一个非轮询非过多线程解决方案:修改现有Event
的 s 以在它们更改时触发回调,并在该回调中处理设置新事件:
import threading
def or_set(self):
self._set()
self.changed()
def or_clear(self):
self._clear()
self.changed()
def orify(e, changed_callback):
e._set = e.set
e._clear = e.clear
e.changed = changed_callback
e.set = lambda: or_set(e)
e.clear = lambda: or_clear(e)
def OrEvent(*events):
or_event = threading.Event()
def changed():
bools = [e.is_set() for e in events]
if any(bools):
or_event.set()
else:
or_event.clear()
for e in events:
orify(e, changed)
changed()
return or_event
示例用法:
def wait_on(name, e):
print "Waiting on %s..." % (name,)
e.wait()
print "%s fired!" % (name,)
def test():
import time
e1 = threading.Event()
e2 = threading.Event()
or_e = OrEvent(e1, e2)
threading.Thread(target=wait_on, args=('e1', e1)).start()
time.sleep(0.05)
threading.Thread(target=wait_on, args=('e2', e2)).start()
time.sleep(0.05)
threading.Thread(target=wait_on, args=('or_e', or_e)).start()
time.sleep(0.05)
print "Firing e1 in 2 seconds..."
time.sleep(2)
e1.set()
time.sleep(0.05)
print "Firing e2 in 2 seconds..."
time.sleep(2)
e2.set()
time.sleep(0.05)
结果是:
Waiting on e1...
Waiting on e2...
Waiting on or_e...
Firing e1 in 2 seconds...
e1 fired!or_e fired!
Firing e2 in 2 seconds...
e2 fired!
这应该是线程安全的。欢迎任何意见。
编辑:哦,这是你的wait_for_either
函数,虽然我编写代码的方式是,最好制作并传递一个or_event
. 请注意,or_event
不应手动设置或清除 。
def wait_for_either(e1, e2):
OrEvent(e1, e2).wait()
我认为标准库为这个问题提供了一个非常规范的解决方案,我没有看到这个问题:条件变量。您让主线程等待条件变量,并在每次收到通知时轮询事件集。仅在其中一个事件更新时才通知它,因此没有浪费的轮询。这是一个 Python 3 示例:
from threading import Thread, Event, Condition
from time import sleep
from random import random
event1 = Event()
event2 = Event()
cond = Condition()
def thread_func(event, i):
delay = random()
print("Thread {} sleeping for {}s".format(i, delay))
sleep(delay)
event.set()
with cond:
cond.notify()
print("Thread {} done".format(i))
with cond:
Thread(target=thread_func, args=(event1, 1)).start()
Thread(target=thread_func, args=(event2, 2)).start()
print("Threads started")
while not (event1.is_set() or event2.is_set()):
print("Entering cond.wait")
cond.wait()
print("Exited cond.wait ({}, {})".format(event1.is_set(), event2.is_set()))
print("Main thread done")
示例输出:
Thread 1 sleeping for 0.31569427100177794s
Thread 2 sleeping for 0.486548134317051s
Threads started
Entering cond.wait
Thread 1 done
Exited cond.wait (True, False)
Main thread done
Thread 2 done
请注意,没有额外的线程或不必要的轮询,您可以等待任意谓词变为真(例如,设置任何特定的事件子集)。该模式还有一个wait_for
包装器while (pred): cond.wait()
,它可以使您的代码更易于阅读。
一种解决方案(使用轮询Event
)是在循环中对每个进行顺序等待
def wait_for_either(a, b):
while True:
if a.wait(tunable_timeout):
break
if b.wait(tunable_timeout):
break
我认为,如果您将超时调整得足够好,结果就可以了。
我能想到的最好的非轮询是在不同的线程中等待每个人,并Event
在主线程中设置一个共享的人。
def repeat_trigger(waiter, trigger):
waiter.wait()
trigger.set()
def wait_for_either(a, b):
trigger = threading.Event()
ta = threading.Thread(target=repeat_trigger, args=(a, trigger))
tb = threading.Thread(target=repeat_trigger, args=(b, trigger))
ta.start()
tb.start()
# Now do the union waiting
trigger.wait()
很有趣,所以我写了一个之前解决方案的 OOP 版本:
class EventUnion(object):
"""Register Event objects and wait for release when any of them is set"""
def __init__(self, ev_list=None):
self._trigger = Event()
if ev_list:
# Make a list of threads, one for each Event
self._t_list = [
Thread(target=self._triggerer, args=(ev, ))
for ev in ev_list
]
else:
self._t_list = []
def register(self, ev):
"""Register a new Event"""
self._t_list.append(Thread(target=self._triggerer, args=(ev, )))
def wait(self, timeout=None):
"""Start waiting until any one of the registred Event is set"""
# Start all the threads
map(lambda t: t.start(), self._t_list)
# Now do the union waiting
return self._trigger.wait(timeout)
def _triggerer(self, ev):
ev.wait()
self._trigger.set()
启动额外的线程似乎是一个明确的解决方案,但不是很有效。函数 wait_events 将阻止 util 设置任何一个事件。
def wait_events(*events):
event_share = Event()
def set_event_share(event):
event.wait()
event.clear()
event_share.set()
for event in events:
Thread(target=set_event_share(event)).start()
event_share.wait()
wait_events(event1, event2, event3)
在您可以等待的地方扩展克劳迪乌的答案:
from threading import Thread, Event, _Event
class ConditionalEvent(_Event):
def __init__(self, events_list, condition):
_Event.__init__(self)
self.event_list = events_list
self.condition = condition
for e in events_list:
self._setup(e, self._state_changed)
self._state_changed()
def _state_changed(self):
bools = [e.is_set() for e in self.event_list]
if self.condition == 'or':
if any(bools):
self.set()
else:
self.clear()
elif self.condition == 'and':
if all(bools):
self.set()
else:
self.clear()
def _custom_set(self,e):
e._set()
e._state_changed()
def _custom_clear(self,e):
e._clear()
e._state_changed()
def _setup(self, e, changed_callback):
e._set = e.set
e._clear = e.clear
e._state_changed = changed_callback
e.set = lambda: self._custom_set(e)
e.clear = lambda: self._custom_clear(e)
示例用法将与以前非常相似
import time
e1 = Event()
e2 = Event()
# Example to wait for triggering of event 1 OR event 2
or_e = ConditionalEvent([e1, e2], 'or')
# Example to wait for triggering of event 1 AND event 2
and_e = ConditionalEvent([e1, e2], 'and')
这是一个老问题,但我希望这对来自谷歌的人有所帮助。
接受的答案相当陈旧,并且会导致两次“orified”事件的无限循环。
这是一个使用concurrent.futures
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
def wait_for_either(events, timeout=None, t_pool=None):
'''blocks untils one of the events gets set
PARAMETERS
events (list): list of threading.Event objects
timeout (float): timeout for events (used for polling)
t_pool (concurrent.futures.ThreadPoolExecutor): optional
'''
if any(event.is_set() for event in events):
# sanity check
pass
else:
t_pool = t_pool or ThreadPoolExecutor(max_workers=len(events))
tasks = []
for event in events:
tasks.append(t_pool.submit(event.wait))
concurrent.futures.wait(tasks, timeout=timeout, return_when='FIRST_COMPLETED')
# cleanup
for task in tasks:
try:
task.result(timeout=0)
except concurrent.futures.TimeoutError:
pass
测试功能
import threading
import time
from datetime import datetime, timedelta
def bomb(myevent, sleep_s):
'''set event after sleep_s seconds'''
with lock:
print('explodes in ', datetime.now() + timedelta(seconds=sleep_s))
time.sleep(sleep_s)
myevent.set()
with lock:
print('BOOM!')
lock = threading.RLock() # so prints don't get jumbled
a = threading.Event()
b = threading.Event()
t_pool = ThreadPoolExecutor(max_workers=2)
threading.Thread(target=bomb, args=(event1, 5), daemon=True).start()
threading.Thread(target=bomb, args=(event2, 120), daemon=True).start()
with lock:
print('1 second timeout, no ThreadPool', datetime.now())
wait_for_either([a, b], timeout=1)
with lock:
print('wait_event_or done', datetime.now())
print('=' * 15)
with lock:
print('wait for event1', datetime.now())
wait_for_either([a, b], t_pool=t_pool)
with lock:
print('wait_event_or done', datetime.now())
不漂亮,但您可以使用两个额外的线程来多路复用事件......
def wait_for_either(a, b):
flag = False #some condition variable, event, or similar
class Event_Waiter(threading.Thread):
def __init__(self, event):
self.e = event
def run(self):
self.e.wait()
flag.set()
a_thread = Event_Waiter(a)
b_thread = Event_Waiter(b)
a.start()
b.start()
flag.wait()
请注意,如果它们到达得太快,您可能不得不担心意外获得这两个事件。辅助线程(a_thread 和 b_thread)应该在尝试设置标志时锁定同步,然后应该杀死另一个线程(如果它被消耗,可能会重置该线程的事件)。
def wait_for_event_timeout(*events):
while not all([e.isSet() for e in events]):
#Check to see if the event is set. Timeout 1 sec.
ev_wait_bool=[e.wait(1) for e in events]
# Process if all events are set. Change all to any to process if any event set
if all(ev_wait_bool):
logging.debug('processing event')
else:
logging.debug('doing other work')
e1 = threading.Event()
e2 = threading.Event()
t3 = threading.Thread(name='non-block-multi',
target=wait_for_event_timeout,
args=(e1,e2))
t3.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(5)
e1.set()
time.sleep(10)
e2.set()
logging.debug('Event is set')