我有一个功能,我想在每分钟触发一次——在 00 秒。它通过空中发射一个数据包到一个安装在墙上的哑显示器。
我知道我可以用 while 循环蛮力它,但这似乎有点苛刻。
我曾尝试使用 sched ,但最终每分钟增加一秒。
我有哪些选择?
您可以尝试APScheduler,这是一个用于 Python 的 cron 样式的调度程序模块。
从他们的例子中:
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
def job_function():
print "Hello World"
sched.add_cron_job(job_function, second=0)
将job_function
每分钟运行一次。
如果您测量代码执行所需的时间,然后从 60 的睡眠时间中减去它会怎样?
import time
while True:
timeBegin = time.time()
CODE(.....)
timeEnd = time.time()
timeElapsed = timeEnd - timeBegin
time.sleep(60-timeElapsed)
最简单的解决方案是向操作系统注册一个超时,以便在您希望它到期时到期。
现在有很多方法可以使用阻塞指令执行此操作,最佳选择取决于您的实现。最简单的方法是使用time.sleep()
:
import time
current_time = time.time()
time_to_sleep = 60 - (current_time % 60)
time.sleep(time_to_sleep)
这样,您可以获取当前时间并计算您需要睡眠的时间(以秒为单位)。不精确,但足够接近。
APScheduler是正确的方法。但是,自原始答案以来,语法已经改变。
从 APScheduler 3.3.1 开始:
def fn():
print("Hello, world")
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start()
scheduler.add_job(fn, trigger='cron', second=0)
你可以试试Threading.Timer
请参阅此示例
from threading import Timer
def job_function():
Timer(60, job_function).start ()
print("Running job_funtion")
它将每分钟打印一次“正在运行的job_function”
编辑:如果我们对它应该运行的时间持批评态度
from threading import Timer
from time import time
def job_function():
Timer(int(time()/60)*60+60 - time(), job_function).start ()
print("Running job_funtion")
它将在每分钟的第 0 秒准确运行。
语法已更改,因此在3.6.3 版(发布时间:2019 年 11 月 5 日)的APScheduler中使用以下代码段:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
def fn():
print('Hello, world!')
sched = BlockingScheduler()
# Execute fn() at the start of each minute.
sched.add_job(fn, trigger=CronTrigger(second=00))
sched.start()
Python 时间模块通常是我用于此类事件的。它有一个名为 的方法sleep(t)
,其中t
等于您要延迟的时间(以秒为单位)。
结合while
循环,您可以获得所需的内容:
import time
while condition:
time.sleep(60)
f(x)
你可以使用一个while循环和睡眠来不要过多地消耗处理器