2

【注意:使用 Python 2.7 和 jupyter notebook 环境】

我希望能够使用 APscheduler 将实例方法安排为作业,并将这些作业存储在持久数据库中 [在本例中为 mongodb]。

但是,当尝试这样做时,我遇到了以下错误:unbound method use_variable() must be called with Job instance as first argument (got NoneType instance instead)

在此之前我已经成功:(a)将实例方法调度为作业(b)将作业存储在 mongodb

但是,我不能让这两者一起工作。

什么有效:

(a)将实例方法调度为作业的基本示例......

from apscheduler.schedulers.background import BackgroundScheduler

class Job:
    def __init__(self, config):
        self.variable = config['variable']

    def use_variable(self):
        print(self.variable)

job=Job({'variable': 'test'})
scheduler = BackgroundScheduler()
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()

按预期运行,每 5 秒打印一次“测试”。

什么打破:

但是,只要我添加一个作业商店......

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore

class Job:
    def __init__(self, config):
        self.variable = config['variable']

    def use_variable(self):
        print(self.variable)

job=Job({'variable': 'test'})

jobstores = {'default': MongoDBJobStore(
    database='apscheduler', collection='jobs')}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()

该作业第一次成功运行,但由于“未绑定方法”而导致所有后续调用错误。我猜想作业实例没有从作业存储中检索,因此没有“自我”传递给 use_variable 方法......

从文档...

从文档:In case of a bound method, passing the unbound version (YourClass.method_name) as the target function to add_job() with the class instance as the first argument (so it gets passed as the self argument)

因此,我尝试了: scheduler.add_job(Job.use_variable, args=[job] trigger='interval', seconds=5)

也没有运气。

当前的解决方法

我目前正在使用以下解决方法,但是它非常hacky,我想找到一个更优雅的解决方案!

def proxy(job):
   job.use_variable()

scheduler.add_job(proxy, args=[job], trigger='interval', seconds=5)

按照 Alex 的建议使用阻塞调度程序进行更新

最小示例代码:

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore

class Job:
    def __init__(self, config):
        self.variable = config['variable']

    def use_variable(job):
        print(job.variable)


job = Job({'variable': 'test'})

jobstores = {'default': MongoDBJobStore(
    database='apscheduler', collection='jobs')}
scheduler = BlockingScheduler(jobstores=jobstores)

def my_listener(event):
    if (event.exception):
        print('Exception: {}'.format(event.exception))

scheduler.add_listener(my_listener)
scheduler.add_job(job.use_variable, trigger='interval',seconds=5)
// Also tried:
// scheduler.add_job(Job.use_variable, args=[job] trigger='interval',seconds=5)

scheduler.start()

重现错误。第一次调用成功打印'test',但随后的调用遇到了未绑定错误[至少在jupyter笔记本环境中]......

4

1 回答 1

1

我在 APScheduler 3.6.0 中发现了一个错误,它只在 Python 2.7 上表现出来。它试图在方法调度方面做正确的事情,但在 py2.7 上它做错了。我将在下一个版本中解决这个问题。与此同时,您可以继续使用您的解决方法。

于 2019-07-10T05:40:45.743 回答