1

我正在使用 apsscheduler 在 python 中安排作业。json 中的配置文件用于接受作业的输入。

def add_config_job(sched, job):
    module = JOB_METHODS.get(job["type"])
    if module is None:
        logging.warn("job type %r not supported", job["type"])
        return

    func = module.cron_job
    args = (job,)
    name = "%s__%s" % (job["name"], job["id"])
    start_date = job.get("start_date")
    run_at = job["run_at"]

    if isinstance(job["run_at"], dict):
        sched.add_cron_job(func, args=args, name=name, start_date=start_date,
                           **run_at)
    elif isinstance(job["run_at"], basestring):
        sched.add_date_job(func, args=args, name=name, date=run_at)
    else:
        logging.warn("unsupported 'run_at' type (%s given)", run_at)

函数.py

FUNCTIONS = {
    "generate_report": report.generate_report
}

def cron_job(job):
    function = job["function"]
    args = job.get("args", [])
    kwargs = job.get("kwargs", {})
    interval_type = job.get("interval_type")
    timezone = job.get("timezone")
    report_time = job.get("report_time")

    func = FUNCTIONS[function]
    loader = args_loader.Loader(interval_type, timezone, report_time)
    args = loader.load(args)
    kwargs = loader.load(kwargs)

    func(*args, **kwargs)

在这里,schedular 的运行给出了超出范围错误的年份

"/usr/local/src/ptf/disk/installed/scheduler/libscheduler/scheduling.py", line 23, in start add_config_job(sched, job) 
    File "/usr/local/src/ptf/disk/installed/scheduler/libscheduler/scheduling.py", line 41, in add_config_job **run_at) 
    File "/usr/lib/pymodules/python2.7/apscheduler/scheduler.py", line 347, in add_cron_job return self.add_job(trigger, func, args, kwargs, **options) 
    File "/usr/lib/pymodules/python2.7/apscheduler/scheduler.py", line 264, in add_job self._real_add_job(job, jobstore, True) 
    File "/usr/lib/pymodules/python2.7/apscheduler/scheduler.py", line 220, in _real_add_job job.compute_next_run_time(datetime.now()) 
    File "/usr/lib/pymodules/python2.7/apscheduler/job.py", line 74, in compute_next_run_time self.next_run_time = self.trigger.get_next_fire_time(now) File "/usr/lib/pymodules/python2.7/apscheduler/triggers/cron/__init__.py", line 115, in get_next_fire_time fieldnum - 1) 
    File "/usr/lib/pymodules/python2.7/apscheduler/triggers/cron/__init__.py", line 87, in _increment_field_value return datetime(**values),
fieldnum ValueError: year is out of range
4

0 回答 0