0

我可以使用此代码将任务作为我们功能测试的一部分运行。

def run_tasks():
    taskq = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
    tasks = taskq.GetTasks("default")
    taskq.FlushQueue("default")
    try:
        while tasks:
            for task in tasks:
                url, body, headers, method = task["url"], \
                                             base64.b64decode(task["body"]), \
                                             task["headers"], \
                                             task["method"]
                try:
                    res = Request.blank(url,
                                        body=body,
                                        headers=headers,
                                        method=method).get_response(wsgi_app)
                    if res.status_int != 200:
                        log.error("%s\n%s" % (url, str(res)))
                except Exception:
                    log.error(url, exc_info=True)

            tasks = taskq.GetTasks("default")
            taskq.FlushQueue("default")
    finally:
        pass

然而,管道任务在这里爆炸并带有以下信息

“在尝试 1 时收到了管道 ID“[some id]”的评估任务,''直到:[未来某个时间晚些时候]”

相关来源在这里。

+++ b/src/pipeline/pipeline/pipeline.py Wed Apr 04 20:01:12 2012 -0400
@@ -1946,6 +1952,7 @@
     if pipeline_record.next_retry_time is not None:
       retry_time = pipeline_record.next_retry_time - _RETRY_WIGGLE_TIMEDELTA
       if self._gettime() <= retry_time:
+        # error under unit tests
         detail_message = (
             'Received evaluation task for pipeline ID "%s" on attempt %d, '
             'which will not be ready until: %s' % (pipeline_key.name(),

我想猴子在代码中修补 _RETRY_WIGGLE_TIMEDELTA 以在我的测试套件中调出 appengine api,但这似乎还不够。

没有等到时间,在我的代码中,这会使我的测试套件运行得非常慢,我不知所措。

有任何想法吗?

4

1 回答 1

0

好吧,我误诊了这个问题。任务失败,该逻辑是重试逻辑。在管道 api 中强制执行这有点奇怪,但我相信这是有充分理由的。所以在我的处理任务循环中的某个地方,我需要检查重试计数并以某种方式将它们从队列中取出,然后引发从第一次运行中捕获的所有错误信息,以便可以认为测试失败。

于 2012-04-06T16:00:23.103 回答