我有一个小脚本可以轮询数据库以查找某些作业的状态。我决定使用 APScheduler 来处理循环调用。如果花费太长时间,我创建了一个装饰器来使函数超时。我在这里遇到的问题是装饰器在一个类中,即使我在两个不同的函数中创建了该类的两个实例,它们总是具有相同的 start_time。我想也许如果我在我的类中移动装饰器并在 init 调用中初始化 start_time 它会更新类的每个实例的 start_time。当我移动类的装饰器并分配 self.start_time = datetime.now() 时,开始时间会在每次调用类时更新,因此永远不会超时。类内部装饰器的示例也在下面。
def timeout(start, min_to_wait):
def decorator(func):
def _handle_timeout():
scheduler.shutdown(wait=False)
@wraps(func)
def wrapper(*args, **kwargs):
expire = start + timedelta(minutes = min_to_wait)
now = datetime.now()
if now > expire:
_handle_timeout()
return func(*args, **kwargs)
return wrapper
return decorator
class Job(object):
def __init__(self, name, run_id, results):
self.name = name
self.run_id = object_id
self.results = results
self.parcel_id= None
self.status = None
start_time = datetime.now()
@timeout(start_time, config.WAIT_TIME)
def wait_for_results(self):
if self.results:
self.pack_id = self.results[0].get('parcel_id')
self.status = self.results[0].get('status')
return self.results[0]
else:
return False
@timeout(start_time, config.WORK_TIME)
def is_done(self):
status = self.results[0].get('status')
status_map = {'done': True,
'failed': FailedError,
'lost': LostError}
def _get_or_throw(s, map_obj):
value = map_obj.get(s)
if s in ['failed', 'lost']:
raise value(s)
else:
self.status = s
return s
return _get_or_throw(status, status_map)
def job_1(mssql, postgres, runid):
res = get_results(mssql, config.MSSQL, first_query_to_call)
first_job= Job('first_job', runid, res)
step_two = pack_job.wait_for_results()
if step_two:
try:
logger.info(first_job)
if first_job.is_done() == 'done':
scheduler.remove_job('first_job')
scheduler.add_job(lambda: job_two(mssql,
postgres, first_job.object_id, runid), 'interval', seconds=config.POLL_RATE, id='second_job')
except LostError as e:
logger.error(e, exc_info=True)
scheduler.shutdown(wait=False)
except FailedError as e:
logger.error(e, exc_info=True)
scheduler.shutdown(wait=False)
def job_two(mssql, postgres, object_id, runid):
res = get_results(mssql, config.MSSQL, some_other_query_to_run, object_id)
second_job= Job('second_job', runid, res)
step_two = second_job.wait_for_results()
if step_two:
try:
logger.info(second_job)
if second_job.is_done() == 'done':
scheduler.remove_job('second_job')
except LostError as e:
logger.error(e, exc_info=True)
scheduler.shutdown(wait=False)
except FailedError as e:
logger.error(e, exc_info=True)
scheduler.shutdown(wait=False)
if __name__ == '__main__':
runid = sys.argv[1:]
if runid:
runid = runid[0]
scheduler = BlockingScheduler()
run_job = scheduler.add_job(lambda: job_one(pymssql, psycopg2, runid), 'interval', seconds=config.POLL_RATE, id='first_job')
尝试在类内移动装饰器:
class Job(object):
def __init__(self, name, run_id, results):
self.name = name
self.run_id = run_id
self.results = results
self.pack_id = None
self.status = None
self.start_time = datetime.now()
def timeout(min_to_wait):
def decorator(func):
def _handle_timeout():
scheduler.shutdown(wait=False)
@wraps(func)
def wrapper(self, *args, **kwargs):
print '**'
print self.start_time
print ''
expire = self.start_time + timedelta(minutes = min_to_wait)
now = datetime.now()
if now > expire:
_handle_timeout()
return func(self, *args, **kwargs)
return wrapper
return decorator
这是我使用上述装饰器时的示例输出。
**
self start time: 2014-10-28 08:57:11.947026
**
self start time: 2014-10-28 08:57:16.976828
**
self start time: 2014-10-28 08:57:21.989064
start_time 需要保持不变,否则我不能使函数超时。