3

看起来 google 已经发布了对从 python 中的 dataflow/beam 查询数据存储的支持。我试图让它在本地运行,但我遇到了一些问题:

import apache_beam as beam
from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
from gcloud import datastore

client = datastore.Client('my-project')
query = client.query(kind='Document')

options = get_options()
p = beam.Pipeline(options=options)

entities = p | 'read' >> ReadFromDatastore(project='my-project', query=query)
entities | 'write' >> beam.io.Write(beam.io.TextFileSink('gs://output.txt'))

p.run()

这给了我一个

AttributeError: 'Query' object has no attribute 'HasField' [while running 'read/Split Query']

我猜我传入了错误的查询对象(有 3-4 个 pip 包可以从中导入数据存储),但我不知道应该传入哪个。在测试中他们正在传递protobuf。那是我必须使用的吗?如果这是我必须做的,任何人都可以使用 protobuf 显示一个简单的示例查询吗?

4

1 回答 1

2

wordcount 示例使用protobufs 进行查询。

看起来你需要类似的东西:

from google.datastore.v1 import query_pb2
...
query = query_pb2.Query()
query.kind.add().name = 'Document'
于 2016-12-14T06:00:28.687 回答