2

我正在使用 Apache Beam Java SDK 创建一个 Google 数据流管道。我在那里进行了一些转换,最后我创建了一个实体集合( PCollection< Entity > )。我需要将其写入 Google DataStore,然后在写入所有实体后执行另一个转换。(例如通过 PubSub Message 将已保存对象的 ID 广播给多个订阅者)。

现在,存储 PCollection 的方法是:entities.DatastoreIO.v1().write().withProjectId("abc")

这将返回一个 PDone 对象,我不确定如何在此 Write() 完成后链接另一个转换发生。由于 DatastoreIO.write() 调用不返回 PCollection,因此我无法进一步处理管道。我有 2 个问题:

  1. 如何获取写入数据存储的对象的 ID?

  2. 如何附加另一个在保存所有实体后将起作用的转换?

4

1 回答 1

3

我们没有很好的方法来做这些事情(返回写入的 Datastore 实体的 ID,或者等到实体被写入),尽管这与第一个类似的请求相去甚远(人们已经为 BigQuery 提出了这个要求,因为例如),我们正在考虑。

现在您唯一的选择是等到整个管道完成,例如 via pipeline.run().waitUntilFinish(),然后在您的主程序中执行您想要的操作(例如,您可以运行另一个管道)。

于 2017-09-15T15:11:55.850 回答