57

我正在尝试安排一个重复事件在 Python 3 中每分钟运行一次。

我看过课程sched.scheduler,但我想知道是否有其他方法可以做到这一点。我听说过我可以为此使用多个线程,我不介意这样做。

我基本上是在请求一些 JSON,然后对其进行解析;它的价值随时间而变化。

要使用sched.scheduler,我必须创建一个循环来请求它安排甚至运行一小时:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()

还有什么其他方法可以做到这一点?

4

12 回答 12

64

您可以使用threading.Timer,但这也可以安排一次性事件,类似于.enter调度程序对象的方法。

将一次性调度器转换为周期性调度器的正常模式(在任何语言中)是让每个事件以指定的时间间隔重新调度自身。例如,使用sched,我不会像你正在做的那样使用循环,而是使用类似的东西:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)

并通过调用启动整个“永久定期计划”

periodic(scheduler, 3600, query_rate_limit)

或者,我可以使用threading.Timer而不是scheduler.enter,但模式非常相似。

如果您需要更精细的变化(例如,在给定时间或在某些条件下停止定期重新调度),那么通过一些额外的参数来适应并不难。

于 2010-03-08T03:35:34.610 回答
36

您可以使用schedule。它适用于 Python 2.7 和 3.3,而且相当轻量级:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)
于 2013-05-28T07:54:29.197 回答
23

我对这个问题的谦虚:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特征:

  • 仅标准库,无外部依赖
  • 使用 Alex Martnelli 建议的模式
  • start()即使计时器已经启动/停止,stop()也可以安全地多次调用
  • 要调用的函数可以有位置参数和命名参数
  • 您可以interval随时更改,下次运行后生效。同样的argskwargs甚至function
于 2012-10-31T04:03:59.230 回答
10

基于 MestreLion 的回答,它解决了多线程的一个小问题:

from threading import Timer, Lock


class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
        self._lock.release()  <- wrong indentation

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()
于 2013-09-19T22:59:45.013 回答
8

使用芹菜

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something
于 2013-09-22T10:45:43.963 回答
8

您可以使用Advanced Python Scheduler。它甚至有一个类似 cron 的界面。

于 2012-11-09T17:11:54.353 回答
5

根据 Alex Martelli 的回答,我实现了更容易集成的装饰器版本。

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __name__ == '__main__':
    print('start')
    periodic_event()
    print('end')
于 2018-02-13T03:14:28.460 回答
2

文档:高级 Python 调度程序

@sched.cron_schedule(day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

可用字段:

| Field       | Description                                                    |
|-------------|----------------------------------------------------------------|
| year        | 4-digit year number                                            |
| month       | month number (1-12)                                            |
| day         | day of the month (1-31)                                        |
| week        | ISO week number (1-53)                                         |
| day_of_week | number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) |
| hour        | hour (0-23)                                                    |
| minute      | minute (0-59)                                                  |
| second      | second (0-59)                                                  |
于 2020-12-21T14:09:31.927 回答
2

这是一个快速而肮脏的非阻塞循环Thread

#!/usr/bin/env python3
import threading,time

def worker():
    print(time.time())
    time.sleep(5)
    t = threading.Thread(target=worker)
    t.start()


threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")

没有什么特别的,它worker会延迟创建一个自己的新线程。可能不是最有效的,但足够简单。如果您需要更复杂的解决方案, northtree 的答案将是您的最佳选择。

基于,我们可以做同样的事情,只需Timer

#!/usr/bin/env python3
import threading,time

def hello():
    t = threading.Timer(10.0, hello)
    t.start()
    print( "hello, world",time.time() )

t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How's it going?",time.time())
于 2018-05-25T22:59:25.490 回答
1

有一个新包,名为ischedule. 对于这种情况,解决方案可能如下:

from ischedule import schedule, run_loop
from datetime import timedelta


def query_rate_limit():
    print("query_rate_limit")

schedule(query_rate_limit, interval=60)
run_loop(return_after=timedelta(hours=1))

一切都在主线程上运行,并且在 run_loop 内没有忙于等待。启动时间非常精确,通常在指定时间的几分之一毫秒内。

于 2021-06-07T09:33:48.147 回答
0

不久前我遇到了类似的问题,所以我制作了一个 python 模块事件调度程序来解决这个问题。它具有与 sched 库非常相似的 API,但有一些区别:

  1. 它利用后台线程,并且始终能够在后台接受和运行作业,直到调度程序显式停止(不需要 while 循环)。
  2. 它带有一个 API,用于按用户指定的时间间隔安排重复事件,直到明确取消。

它可以通过安装pip install event-scheduler

from event_scheduler import EventScheduler

event_scheduler = EventScheduler()
event_scheduler.start()
# Schedule the recurring event to print "hello world" every 60 seconds with priority 1
# You can use the event_id to cancel the recurring event later
event_id = event_scheduler.enter_recurring(60, 1, print, ("hello world",))
于 2021-10-04T03:29:16.947 回答
0

查看我的示例

import sched, time

def myTask(m,n):
  print n+' '+m

def periodic_queue(interval,func,args=(),priority=1):
  s = sched.scheduler(time.time, time.sleep)
  periodic_task(s,interval,func,args,priority)
  s.run()

def periodic_task(scheduler,interval,func,args,priority):
  func(*args)
  scheduler.enter(interval,priority,periodic_task,
                   (scheduler,interval,func,args,priority))

periodic_queue(1,myTask,('world','hello'))
于 2018-05-04T23:40:25.703 回答