0

最近云数据流 python sdk 可用,我决定使用它。不幸的是,还没有支持从云数据存储中读取数据,所以我不得不依赖编写自定义源代码,以便我可以按照承诺利用动态拆分、进度估计等的好处。我确实彻底研究了文档,但无法将各个部分放在一起,以便我可以加快整个过程。

更清楚地说,我的第一种方法是:

  1. 查询云数据存储
  2. 创建 ParDo 函数并将返回的查询传递给它。

但是这样一来,迭代超过 20 万个条目需要 13 分钟。

所以我决定编写可以有效读取实体的自定义源。但是由于我对将各个部分组合在一起缺乏了解,我无法做到这一点。任何人都可以帮助我如何创建自定义源以从数据存储中读取。

编辑:对于第一种方法,我的要点的链接是: https ://gist.github.com/shriyanka/cbf30bbfbf277deed4bac0c526cf01f1

谢谢你。

4

1 回答 1

1

在您提供的代码中,对 Datastore 的访问发生在甚至构建管道之前:

query = client.query(kind='User').fetch()

这会在 Beam SDK 完全参与之前执行整个查询并读取所有实体。

更准确地说,fetch()在查询结果上返回一个惰性可迭代对象,并且在您构建管道时它们会被迭代beam.Create(query)- 但是,这再次发生在您的主程序中,在管道启动之前。最有可能的是,这需要 13 分钟,而不是管道本身(但请随时提供工作 ID,以便我们更深入地了解)。您可以通过对代码进行一些小的更改来验证这一点:

query = list(client.query(kind='User').fetch())

但是,我认为您的意图是同时读取处理实体。

特别是对于 Cloud Datastore,自定义源 API 并不是最好的选择。原因是底层 Cloud Datastore API 本身目前不提供实现自定义源“好东西”所需的属性,例如进度估计和动态拆分,因为它的查询 API 非常通用(不像 Cloud Bigtable,它总是返回按键排序的结果,例如,您可以通过查看当前键来估计进度)。

我们目前正在重写 Java Cloud Datastore 连接器以使用不同的方法,该方法使用 aParDo拆分查询并使用 aParDo读取每个子查询。有关详细信息,请参阅此拉取请求

于 2016-08-01T17:50:20.010 回答