一个基本的管道只返回一个值:
class MyFirstPipeline(pipeline.Pipeline):
def run(self):
return "Hello World"
该值必须是 JSON 可序列化的。
如果您需要协调多个管道,则需要使用生成器管道和yield
语句。
class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
yield MyFirstPipeline()
您可以将管道的产生视为返回'future'。
您可以将此 future 作为输入 arg 传递给另一个管道:
class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
result = yield MyFirstPipeline()
yield MyOtherPipeline(result)
Pipeline API 将确保仅在将future from解析为实际值后才调用的run
方法。MyOtherPipeline
result
MyFirstPipeline
你不能混合使用相同yield
的return
方法。如果您使用yield
的值必须是 Pipeline 实例。如果您想这样做,这可能会导致问题:
class MyRootPipeline(pipeline.Pipeline):
def run(self, *input_args):
results = []
for input_arg in input_args:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield results
在这种情况下,Pipeline API 只会在您的最后一行中看到一个列表yield results
,因此它不知道在返回之前解决其中的期货,您将收到错误。
它们没有记录,但包含一个实用管道库,可以在这里提供帮助:
https ://code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py
因此,实际工作的上述版本如下所示:
import pipeline
from pipeline import common
class MyRootPipeline(pipeline.Pipeline):
def run(self, *input_args):
results = []
for input_arg in input_args:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield common.List(*results)
现在我们好了,我们正在生成一个管道实例,并且 Pipeline API 知道正确地解析它的未来值。管道的来源common.List
很简单:
class List(pipeline.Pipeline):
"""Returns a list with the supplied positional arguments."""
def run(self, *args):
return list(args)
...在调用此管道的run
方法时,管道 API 已将列表中的所有项目解析为实际值,可以作为*args
.
无论如何,回到你原来的例子,你可以做这样的事情:
class FetchEntitites(pipeline.Pipeline):
def run(self, cursor=None)
if cursor is not None:
cursor = Cursor(urlsafe=cursor)
# I think it's ok to pass None as the cursor here, haven't confirmed
results, next_curs, more = MyModel.query().fetch_page(100,
start_cursor=cursor)
# queue up a task for the next page of results immediately
future_results = []
if more:
future_results = yield FetchEntitites(next_curs.urlsafe())
current_results = [ do some work on `results` ]
# (assumes current_results and future_results are both lists)
# this will have to wait for all of the recursive calls in
# future_results to resolve before it can resolve itself:
yield common.Extend(current_results, future_results)
进一步说明
一开始我说我们可以把result = yield MyPipeline()
它当作返回一个“未来”。严格来说这不是真的,显然我们实际上只是在产生实例化的管道。(不用说我们的run
方法现在是一个生成器函数。)
Python 的yield 表达式如何工作的奇怪部分是,尽管它看起来像这样,但您将值放在函数之外yield
的某个地方(到 Pipeline API 设备)而不是进入您的var。表达式左侧的 var 的值也通过调用生成器(生成器是您定义的方法)从函数外部推入。result
result
send
run
因此,通过产生一个实例化的 Pipeline,您可以让 Pipeline API 获取该实例并run
在其他时间调用它的方法(实际上,它将作为类名和一组 args 和 kwargs 传递到任务队列中并重新-在那里实例化...这就是为什么您的 args 和 kwargs 也需要 JSON 可序列化的原因)。
同时,Pipeline API send
saPipelineFuture
对象进入您的run
生成器,这就是您的result
var 中出现的内容。这似乎有点神奇和违反直觉,但这就是带有 yield 表达式的生成器的工作方式。
我花了相当多的时间才把它解决到这个水平,我欢迎对我做错的任何事情进行澄清或更正。