418

我想永远每 60 秒在 Python 中重复执行一个函数(就像 Objective C 中的NSTimer或 JS 中的 setTimeout 一样)。此代码将作为守护程序运行,实际上就像使用 cron 每分钟调用一次 python 脚本一样,但不需要用户设置。

这个关于在 Python 中实现的 cron 的问题中,解决方案似乎有效地只是sleep() x 秒。我不需要如此高级的功能,所以也许这样的东西会起作用

while True:
    # Code executed here
    time.sleep(60)

这段代码有任何可预见的问题吗?

4

21 回答 21

337

如果您的程序还没有事件循环,请使用sched模块,它实现了一个通用的事件调度程序。

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

如果您已经在使用asyncio, trio, tkinter, PyQt5, gobject,等事件循环库kivy- 只需使用现有事件循环库的方法来安排任务即可。

于 2009-01-23T21:09:33.110 回答
298

将您的时间循环锁定到系统时钟,如下所示:

import time
starttime = time.time()
while True:
    print("tick")
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))
于 2014-08-11T20:25:25.000 回答
96

如果您想要一种非阻塞方式来定期执行您的函数,而不是阻塞无限循环,我会使用线程计时器。这样,您的代码可以继续运行并执行其他任务,并且仍然每 n 秒调用一次您的函数。我经常使用这种技术来打印长的、CPU/磁盘/网络密集型任务的进度信息。

这是我在类似问题中发布的代码,带有 start() 和 stop() 控件:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特征:

  • 仅标准库,无外部依赖
  • start()即使计时器已经启动/停止,stop()也可以安全地多次调用
  • 要调用的函数可以有位置参数和命名参数
  • 您可以interval随时更改,下次运行后生效。同样的argskwargs甚至function
于 2016-07-11T22:15:34.927 回答
78

您可能需要考虑Twisted,它是一个实现Reactor Pattern的 Python 网络库。

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

虽然“while True: sleep(60)”可能会工作 Twisted 可能已经实现了您最终需要的许多功能(如 bobince 所指出的守护进程、日志记录或异常处理),并且可能是一个更强大的解决方案

于 2009-01-23T21:14:06.393 回答
43

这是 MestreLion 对代码的更新,可避免随着时间的推移而漂移。

此处的 RepeatedTimer 类按照 OP 的要求每隔“间隔”秒调用一次给定函数;时间表不取决于函数执行需要多长时间。我喜欢这个解决方案,因为它没有外部库依赖项;这只是纯python。

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

示例用法(复制自 MestreLion 的回答):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
于 2016-12-05T00:22:40.053 回答
32

我认为更简单的方法是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

这样你的代码就被执行了,然后它等待 60 秒然后它再次执行,等待,执行等等......不需要让事情复杂化:D

于 2012-11-04T10:26:10.230 回答
32
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

如果您想在不阻塞剩余代码的情况下执行此操作,可以使用它让它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

该解决方案结合了其他解决方案中很少发现的几个功能:

  • 异常处理:在这个级别上,尽可能地正确处理异常,即在不中止程序的情况下为调试目的记录异常。
  • 没有链接:您在许多答案中找到的常见的类似链的实现(用于安排下一个事件)在调度机制(或其他)中出现任何问题的方面是脆弱的threading.Timer,这将终止链。即使问题的原因已经解决,也不会发生进一步的执行。相比之下,一个简单的循环和一个简单sleep()的等待要健壮得多。
  • 没有漂移:我的解决方案准确跟踪它应该运行的时间。不存在取决于执行时间的漂移(与许多其他解决方案一样)。
  • 跳过:如果一次执行花费了太多时间,我的解决方案将跳过任务(例如,每五秒执行一次,但 X 花费了 6 秒)。这是标准的 cron 行为(并且有充分的理由)。然后,许多其他解决方案只需连续多次执行任务,没有任何延迟。对于大多数情况(例如清理任务),这是不希望的。如果需要只需使用它next_time += delay
于 2018-04-12T16:31:00.910 回答
17

我最终使用了日程安排模块。API 很好。

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
于 2019-12-05T21:44:15.867 回答
8

替代的灵活性解决方案是Apscheduler

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()

此外,apscheduler 还提供了很多调度器,如下所示。

  • BlockingScheduler:当调度程序是进程中唯一运行的东西时使用

  • BackgroundScheduler:当您不使用以下任何框架并希望调度程序在应用程序的后台运行时使用

  • AsyncIOScheduler:如果您的应用程序使用 asyncio 模块,请使用

  • GeventScheduler:如果您的应用程序使用 gevent,则使用

  • TornadoScheduler:在构建 Tornado 应用程序时使用

  • TwistedScheduler:在构建 Twisted 应用程序时使用

  • QtScheduler:在构建 Qt 应用程序时使用

于 2020-11-30T05:18:03.127 回答
6

它和 cron 之间的主要区别在于,异常会永远杀死守护进程。您可能希望使用异常捕获器和记录器进行包装。

于 2009-01-23T21:12:17.130 回答
6

前段时间我遇到了类似的问题。可能是http://cronus.readthedocs.org可能有帮助?

对于 v0.2,以下代码段有效

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
于 2014-06-06T18:21:58.707 回答
5

如果漂移不是问题

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)
    
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

哪个异步输出。

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

如果正在运行的任务需要相当长的时间,则间隔变为 2 秒 + 任务时间,因此如果您需要精确调度,那么这不适合您。

请注意该daemon=True标志意味着该线程不会阻止应用程序关闭。例如,有问题在pytest运行测试后会无限期挂起,等待此广告停止。

于 2020-05-21T15:19:39.143 回答
5

只需使用

import time

while True:
    print("this will run after every 30 sec")
    #Your code here
    time.sleep(30)
于 2021-05-11T09:45:01.860 回答
3

一个可能的答案:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()
于 2017-03-24T06:56:37.077 回答
2

我使用 Tkinter after() 方法,它不会“窃取游戏”(就像前面介绍的sched模块),即它允许其他东西并行运行:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()并且do_something2()可以以任何间隔速度并行运行。在这里,第二个将被以两倍的速度执行。另外请注意,我使用了一个简单的计数器作为终止任一函数的条件。如果您在程序终止之前运行什么函数(例如时钟),您可以使用您喜欢的任何其他条件或不使用。

于 2018-03-14T08:27:20.970 回答
2

这是 MestreLion 代码的改编版本。除了原来的函数,这段代码:

1)添加first_interval用于在特定时间触发定时器(调用者需要计算first_interval并传入)

2)解决原始代码中的竞争条件。在原始代码中,如果控制线程未能取消正在运行的定时器(“停止定时器,并取消定时器动作的执行。这只有在定时器仍处于等待阶段时才有效。”引用自https:// docs.python.org/2/library/threading.html),计时器将无休止地运行。

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False
于 2018-09-10T09:55:43.000 回答
2

这是另一个不使用任何额外库的解决方案。

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()
于 2020-05-20T17:49:51.270 回答
1

我用它来导致每小时 60 个事件,大多数事件在整分钟后的相同秒数发生:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

根据实际情况,您可能会得到长度刻度:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

但在 60 分钟结束时,您将有 60 个滴答声;并且它们中的大多数将在您喜欢的那一分钟的正确偏移处发生。

在我的系统上,我得到的典型漂移小于 1/20 秒,直到需要进行校正。

这种方法的优点是时钟漂移的分辨率;如果您正在执行诸如每个刻度附加一个项目并且您预计每小时附加 60 个项目之类的事情,这可能会导致问题。未能考虑漂移可能会导致诸如移动平均线之类的次要指示将数据考虑得太深,从而导致输出错误。

于 2017-02-21T13:43:43.773 回答
1

例如,显示当前本地时间

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
于 2017-09-29T18:13:20.170 回答
0
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
于 2018-08-28T10:59:03.507 回答
0

我认为这取决于您想做什么,并且您的问题没有详细说明。

对我来说,我想在我已经多线程的进程之一中进行昂贵的操作。所以我让领导进程检查时间,只有她做昂贵的操作(检查点深度学习模型)。为此,我增加计数器以确保每 5 秒保存 5、10 和 15 秒(或使用带有 math.floor 的模运算):

def print_every_5_seconds_have_passed_exit_eventually():
    """
    https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
    https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
    :return:
    """
    opts = argparse.Namespace(start=time.time())
    next_time_to_print = 0
    while True:
        current_time_passed = time.time() - opts.start
        if current_time_passed >= next_time_to_print:
            next_time_to_print += 5
            print(f'worked and {current_time_passed=}')
            print(f'{current_time_passed % 5=}')
            print(f'{math.floor(current_time_passed % 5) == 0}')
starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True

对我来说,检查 if 语句是我需要的。在我已经很复杂的多处理多 GPU 代码中拥有线程和调度程序并不是我想要添加的复杂性,如果我可以避免它并且似乎我可以。检查工作人员 ID 很容易确保只有 1 个进程在执行此操作。

请注意,我使用 True 打印语句来真正确保模块化算术技巧有效,因为检查确切时间显然不起作用!但令我惊喜的是,地板成功了。

于 2021-05-19T16:48:22.110 回答