4

我想将我的一些任务重写为管道。主要是因为我需要一种方法来检测任务何时完成或按特定顺序启动任务。我的问题是我不确定如何将递归任务重写为管道。通过递归,我的意思是这样称呼自己的任务:

class MyTask(webapp.RequestHandler):
    def post(self):
        cursor = self.request.get('cursor', None)

        [set cursor if not null]
        [fetch 100 entities form datastore]

        if len(result) >= 100:
            [ create the same task in the queue and pass the cursor ]

        [do actual work the task was created for]

现在我真的很想把它写成一个管道并做类似的事情:

class DoSomeJob(pipeline.Pipeline):

   def run(self):
       with pipeline.InOrder():
          yield MyTask()
          yield MyOtherTask()
          yield DoSomeMoreWork(message2)

对此的任何帮助将不胜感激。谢谢!

4

2 回答 2

12

一个基本的管道只返回一个值:

class MyFirstPipeline(pipeline.Pipeline):
    def run(self):
        return "Hello World"  

该值必须是 JSON 可序列化的。

如果您需要协调多个管道,则需要使用生成器管道yield语句。

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        yield MyFirstPipeline()

您可以将管道的产生视为返回'future'

您可以将此 future 作为输入 arg 传递给另一个管道:

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        result = yield MyFirstPipeline()
        yield MyOtherPipeline(result)

Pipeline API 将确保仅在将future from解析为实际值后才调用的run方法。MyOtherPipelineresultMyFirstPipeline

你不能混合使用相同yieldreturn方法。如果您使用yield的值必须是 Pipeline 实例。如果您想这样做,这可能会导致问题:

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield results

在这种情况下,Pipeline API 只会在您的最后一行中看到一个列表yield results,因此它不知道在返回之前解决其中的期货,您将收到错误。

它们没有记录,但包含一个实用管道库,可以在这里提供帮助:
https ://code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py

因此,实际工作的上述版本如下所示:

import pipeline
from pipeline import common

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield common.List(*results)

现在我们好了,我们正在生成一个管道实例,并且 Pipeline API 知道正确地解析它的未来值。管道的来源common.List很简单:

class List(pipeline.Pipeline):
    """Returns a list with the supplied positional arguments."""

    def run(self, *args):
        return list(args)

...在调用管道的run方法时,管道 API 已将列表中的所有项目解析为实际值,可以作为*args.

无论如何,回到你原来的例子,你可以做这样的事情:

class FetchEntitites(pipeline.Pipeline):
    def run(self, cursor=None)
        if cursor is not None:
            cursor = Cursor(urlsafe=cursor)

        # I think it's ok to pass None as the cursor here, haven't confirmed
        results, next_curs, more = MyModel.query().fetch_page(100,
                                                              start_cursor=cursor)

        # queue up a task for the next page of results immediately
        future_results = []
        if more:
            future_results = yield FetchEntitites(next_curs.urlsafe())

        current_results = [ do some work on `results` ]

        # (assumes current_results and future_results are both lists)
        # this will have to wait for all of the recursive calls in
        # future_results to resolve before it can resolve itself:
        yield common.Extend(current_results, future_results)

进一步说明

一开始我说我们可以把result = yield MyPipeline()它当作返回一个“未来”。严格来说这不是真的,显然我们实际上只是在产生实例化的管道。(不用说我们的run方法现在是一个生成器函数。)

Python 的yield 表达式如何工作的奇怪部分是,尽管它看起来像这样,但您将值放在函数之外yield的某个地方(到 Pipeline API 设备)而不是进入您的var。表达式左侧的 var 的值也通过调用生成器(生成器是您定义的方法)从函数外部推入。resultresultsendrun

因此,通过产生一个实例化的 Pipeline,您可以让 Pipeline API 获取该实例并run在其他时间调用它的方法(实际上,它将作为类名和一组 args 和 kwargs 传递到任务队列中并重新-在那里实例化...这就是为什么您的 args 和 kwargs 也需要 JSON 可序列化的原因)。

同时,Pipeline API sendsaPipelineFuture对象进入您的run生成器,这就是您的resultvar 中出现的内容。这似乎有点神奇和违反直觉,但这就是带有 yield 表达式的生成器的工作方式。

我花了相当多的时间才把它解决到这个水平,我欢迎对我做错的任何事情进行澄清或更正。

于 2014-09-02T09:57:58.940 回答
3

当您创建管道时,它会返回一个表示“阶段”的对象。您可以向舞台询问其 ID,然后将其保存。稍后,您可以根据保存的 id 重新构建阶段,然后询问阶段是否已完成。

请参阅http://code.google.com/p/appengine-pipeline/wiki/GettingStarted并查找has_finalized. 有一个示例可以满足您的大部分需求。

于 2012-07-06T22:37:15.617 回答