【注意:使用 Python 2.7 和 jupyter notebook 环境】
我希望能够使用 APscheduler 将实例方法安排为作业,并将这些作业存储在持久数据库中 [在本例中为 mongodb]。
但是,当尝试这样做时,我遇到了以下错误:unbound method use_variable() must be called with Job instance as first argument (got NoneType instance instead)
在此之前我已经成功:(a)将实例方法调度为作业(b)将作业存储在 mongodb
但是,我不能让这两者一起工作。
什么有效:
(a)将实例方法调度为作业的基本示例......
from apscheduler.schedulers.background import BackgroundScheduler
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(self):
print(self.variable)
job=Job({'variable': 'test'})
scheduler = BackgroundScheduler()
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()
按预期运行,每 5 秒打印一次“测试”。
什么打破:
但是,只要我添加一个作业商店......
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(self):
print(self.variable)
job=Job({'variable': 'test'})
jobstores = {'default': MongoDBJobStore(
database='apscheduler', collection='jobs')}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()
该作业第一次成功运行,但由于“未绑定方法”而导致所有后续调用错误。我猜想作业实例没有从作业存储中检索,因此没有“自我”传递给 use_variable 方法......
从文档...
从文档:In case of a bound method, passing the unbound version (YourClass.method_name) as the target function to add_job() with the class instance as the first argument (so it gets passed as the self argument)
因此,我尝试了:
scheduler.add_job(Job.use_variable, args=[job] trigger='interval', seconds=5)
也没有运气。
当前的解决方法
我目前正在使用以下解决方法,但是它非常hacky,我想找到一个更优雅的解决方案!
def proxy(job):
job.use_variable()
scheduler.add_job(proxy, args=[job], trigger='interval', seconds=5)
按照 Alex 的建议使用阻塞调度程序进行更新
最小示例代码:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(job):
print(job.variable)
job = Job({'variable': 'test'})
jobstores = {'default': MongoDBJobStore(
database='apscheduler', collection='jobs')}
scheduler = BlockingScheduler(jobstores=jobstores)
def my_listener(event):
if (event.exception):
print('Exception: {}'.format(event.exception))
scheduler.add_listener(my_listener)
scheduler.add_job(job.use_variable, trigger='interval',seconds=5)
// Also tried:
// scheduler.add_job(Job.use_variable, args=[job] trigger='interval',seconds=5)
scheduler.start()
重现错误。第一次调用成功打印'test',但随后的调用遇到了未绑定错误[至少在jupyter笔记本环境中]......