这个答案实际上有两个建议——我的第一个和我在第一个之后发现的另一个。
预定的
我怀疑您正在寻找sched
模块。
编辑:我的建议在我阅读后似乎没什么帮助。所以我决定测试这个sched
模块,看看它是否可以按照我的建议工作。我的测试来了:我会用一个单独的线程来使用它,或多或少这样:
class SchedulingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.scheduler = sched.scheduler(time.time, time.sleep)
self.queue = []
self.queue_lock = threading.Lock()
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
def run(self):
self.scheduler.run()
def schedule(self, function, delay):
with self.queue_lock:
self.queue.append((delay, 1, function, ()))
def _schedule_in_scheduler(self):
with self.queue_lock:
for event in self.queue:
self.scheduler.enter(*event)
print "Registerd event", event
self.queue = []
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
首先,我将创建一个线程类,它有自己的调度程序和队列。调度程序中至少会注册一个事件:一个用于调用从队列中调度事件的方法。
class SchedulingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.scheduler = sched.scheduler(time.time, time.sleep)
self.queue = []
self.queue_lock = threading.Lock()
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
从队列中调度事件的方法将锁定队列,调度每个事件,清空队列并再次调度自身,以便在将来的某个时间寻找新的事件。请注意,寻找新事件的时间很短(一秒),您可以更改它:
def _schedule_in_scheduler(self):
with self.queue_lock:
for event in self.queue:
self.scheduler.enter(*event)
print "Registerd event", event
self.queue = []
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
该类还应该有一个调度用户事件的方法。自然,此方法应该在更新队列时锁定队列:
def schedule(self, function, delay):
with self.queue_lock:
self.queue.append((delay, 1, function, ()))
最后,该类应该调用调度程序的 main 方法:
def run(self):
self.scheduler.run()
下面是一个使用示例:
def print_time():
print "scheduled:", time.time()
if __name__ == "__main__":
st = SchedulingThread()
st.start()
st.schedule(print_time, 10)
while True:
print "main thread:", time.time()
time.sleep(5)
st.join()
它在我的机器上的输出是:
$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77
这段代码只是一个简单的例子,它可能需要一些工作。但是,我不得不承认我对这个sched
模块有点着迷,所以我建议它。您可能还想寻找其他建议:)
AP调度器
在谷歌中寻找我发布的解决方案,我发现了这个惊人的 APScheduler 模块。它非常实用和有用,我敢打赌它是您的解决方案。使用此模块,我之前的示例会更简单:
from apscheduler.scheduler import Scheduler
import time
sch = Scheduler()
sch.start()
@sch.interval_schedule(seconds=10)
def print_time():
print "scheduled:", time.time()
sch.unschedule_func(print_time)
while True:
print "main thread:", time.time()
time.sleep(5)
(不幸的是我没有找到如何安排一个事件只执行一次,所以函数事件应该自行取消安排。我敢打赌它可以用一些装饰器来解决。)