0

我有一个小脚本可以轮询数据库以查找某些作业的状态。我决定使用 APScheduler 来处理循环调用。如果花费太长时间,我创建了一个装饰器来使函数超时。我在这里遇到的问题是装饰器在一个类中,即使我在两个不同的函数中创建了该类的两个实例,它们总是具有相同的 start_time。我想也许如果我在我的类中移动装饰器并在 init 调用中初始化 start_time 它会更新类的每个实例的 start_time。当我移动类的装饰器并分配 self.start_time = datetime.now() 时,开始时间会在每次调用类时更新,因此永远不会超时。类内部装饰器的示例也在下面。

def timeout(start, min_to_wait):
    def decorator(func):
        def _handle_timeout():
            scheduler.shutdown(wait=False)
        @wraps(func)
        def wrapper(*args, **kwargs):
            expire = start + timedelta(minutes = min_to_wait)
            now = datetime.now()
            if now > expire:
                _handle_timeout()
            return func(*args, **kwargs)
        return wrapper
    return decorator

class Job(object):
    def __init__(self, name, run_id, results):
        self.name = name
        self.run_id = object_id
        self.results = results
        self.parcel_id= None
        self.status = None

    start_time = datetime.now()

    @timeout(start_time, config.WAIT_TIME)
    def wait_for_results(self):
        if self.results:
            self.pack_id = self.results[0].get('parcel_id')
            self.status = self.results[0].get('status')
            return self.results[0]
        else:
            return False

    @timeout(start_time, config.WORK_TIME)
    def is_done(self):
        status = self.results[0].get('status')
        status_map = {'done': True,
                      'failed': FailedError,
                      'lost': LostError}

        def _get_or_throw(s, map_obj):
            value = map_obj.get(s)
            if s in ['failed', 'lost']:
                raise value(s)
            else:
                self.status = s
                return s

        return _get_or_throw(status, status_map)


def job_1(mssql, postgres, runid):
    res = get_results(mssql, config.MSSQL, first_query_to_call)
    first_job= Job('first_job', runid, res)
    step_two = pack_job.wait_for_results()

    if step_two:
        try:
            logger.info(first_job)
            if first_job.is_done() == 'done':
                scheduler.remove_job('first_job')
                scheduler.add_job(lambda: job_two(mssql,
                    postgres, first_job.object_id, runid), 'interval', seconds=config.POLL_RATE, id='second_job')

        except LostError as e:
            logger.error(e, exc_info=True)
            scheduler.shutdown(wait=False)
        except FailedError as e:
            logger.error(e, exc_info=True)
            scheduler.shutdown(wait=False)


def job_two(mssql, postgres, object_id, runid):
    res = get_results(mssql, config.MSSQL, some_other_query_to_run, object_id)
    second_job= Job('second_job', runid, res)
    step_two = second_job.wait_for_results()

    if step_two:
        try:
            logger.info(second_job)
            if second_job.is_done() == 'done':
                scheduler.remove_job('second_job')

        except LostError as e:
            logger.error(e, exc_info=True)
            scheduler.shutdown(wait=False)
        except FailedError as e:
            logger.error(e, exc_info=True)
            scheduler.shutdown(wait=False)

if __name__ == '__main__':
    runid = sys.argv[1:]

    if runid:
        runid = runid[0]

    scheduler = BlockingScheduler()
    run_job = scheduler.add_job(lambda: job_one(pymssql, psycopg2, runid), 'interval', seconds=config.POLL_RATE, id='first_job')

尝试在类内移动装饰器:

class Job(object):
    def __init__(self, name, run_id, results):
        self.name = name
        self.run_id = run_id
        self.results = results
        self.pack_id = None
        self.status = None
        self.start_time = datetime.now()


    def timeout(min_to_wait):
        def decorator(func):
            def _handle_timeout():
                scheduler.shutdown(wait=False)
            @wraps(func)
            def wrapper(self, *args, **kwargs):                
                print '**'
                print self.start_time
                print ''
                expire = self.start_time + timedelta(minutes = min_to_wait)
                now = datetime.now()
                if now > expire:
                    _handle_timeout()
                return func(self, *args, **kwargs)
            return wrapper
        return decorator

这是我使用上述装饰器时的示例输出。

**
self start time: 2014-10-28 08:57:11.947026 

**
self start time: 2014-10-28 08:57:16.976828 

**
self start time: 2014-10-28 08:57:21.989064 

start_time 需要保持不变,否则我不能使函数超时。

4

1 回答 1

0

在第一个示例中,您的开始时间是在class执行语句时初始化的,在您的情况下,是在解释器中首次导入模块时。

在第二个示例中,开始时间在类被实例化时被初始化。对于同一实例,它不应从一种方法调用更改为另一种方法调用。当然,如果您继续创建新实例,每个实例的开始时间会有所不同。Job

现在您没有使用 Job 类发布代码,因此很难判断正确的解决方案是什么。

于 2014-10-28T13:17:16.630 回答