问题标签 [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
582 浏览

apache-flink - 未正确分组的项目 - CoGroupByKey

CoGroupByKey 问题

资料说明。

我有两个数据集。

  • 记录- 第一个,每个(key,day). 对于测试,我使用 2-3 个密钥和 5-10 天的数据。我的目标是 1000+ 键。每条记录都包含键、时间戳(以微秒为单位)和一些其他数据。
  • 配置- 第二个,相当小。它描述了时间的关键,例如,您可以将其视为元组列表:(key, start date, end date, description)

为了探索,我将数据编码为以长度为前缀的协议缓冲区二进制编码消息的文件。此外,这些文件使用 gzip 打包。数据按日期分片。每个文件大约 10MB。

管道

我使用 Apache Beam 来表达管道。

  1. 首先,我向两个数据集添加键。对于 Records 数据集,它是(key, day rounded timestamp). 对于 Configs,key 是,其中 day 是和(key, day)之间的每个时间戳值(指向午夜)。start dateend date
  2. 使用 CoGroupByKey 合并数据集。

作为我使用org.apache.flink.api.java.tuple.Tuple2来自Tuple2Coderrepo github.com/orian/tuple-coder的密钥类型。

问题

如果 Records 数据集很小,比如 5 天,那么一切似乎都很好(检查 normal_run.log)。

当我运行管道超过 10 天时,我遇到一个错误,指出某些记录没有配置 (wrong_run.log)。

然后我添加了一些额外的日志消息:

您可以发现在第一行中,针对 KeyValue3 和时间 1462665600000000 处理了 68643 个项目。
稍后在第 9 行中,该操作似乎再次处理了相同的键,但它报告这些记录没有可用的配置。
第 10 行通知它们已被标记为 no-loc。

第 2 行表示 KeyValue3 和时间 1463184000000000 没有项目,但在第 11 行中,您可以读到此 (key,day) 对的项目是稍后处理的,并且它们缺少配置。

一些线索

在其中一次探索运行期间,我遇到了一个异常(exception_throw.log)。

解决方法(经过更多测试,不起作用,继续使用 Tuple2)

我已经从使用 Tuple2 切换到了 Protocol Buffer 消息:

但是使用Tuple2.of()起来比:KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().

当切换到一个键是从 protobuf.Message 派生的类时,问题消失了 10-15 天(因此数据大小是 Tuple2 的问题),但是将数据大小增加到 20 天表明它就在那里。

0 投票
2 回答
696 浏览

java - 使用 Apache Beam 作为依赖

我正在尝试将 Apache Beam 用作 Maven 或 Gradle 依赖项。据我所知,该项目正在孵化,旨在成为 Google Dataflow SDK 的演进版本。

所以,我可能做了一个错误的假设:因为我没有在 Apache Beam 项目的初始文档中找到,我如何使用他们的 Java sdk 版本作为我项目的 Gradle 或 Maven 依赖项?我还没有找到要添加的确切 artifactid、groupid 是什么。这里有人用过吗?

0 投票
1 回答
239 浏览

java - 如何在谷歌数据流中创建个性化的 WindowFn

我想以WindowFn这种方式创建一个不同的窗口,以基于另一个字段而不是基于我的输入条目的时间戳将 Windows 分配给我的任何输入元素。我知道来自 Google DataFlow SDK 的预定义WindowFn使用时间戳作为分配窗口的标准。

更具体地说,我想创建一种SlidingWindows但不是将时间戳视为窗口分配标准,而是将另一个字段视为该标准。

我怎样才能创建我的自定义WindowFn?创建自己的 时应该考虑哪些要点WindowFn

谢谢。

0 投票
1 回答
110 浏览

google-cloud-dataflow - 数据流进程未在失败时恢复

在最近发生整个 AZ 因中断而丢失的事件之后,我想更好地了解 Dataflow 故障转移过程。

当我手动删除数据流作业(流式传输、PubSub 到 BigQuery)的工作节点时,它们已成功重新创建/重新启动,但数据流进程本身尚未恢复。

即使所有状态都正常,但数据项并没有流动。

重新启动流程的唯一方法是取消作业并重新提交。

即使我知道手动删除不是一个有效的测试,我们也不能忽视人为错误的因素。

我的理解是工作流应该已经自动重新启动,但这里没有观察到这种情况。

我想念什么?

0 投票
1 回答
482 浏览

google-cloud-dataflow - 限制每个键的值数量

目前我们有一个数据流过程,GroupByKey但是DoPar在 group-by 之后每个键获得了太多的值,我们想知道是否有一个好的解决方案。据我所知,没有办法设置每个窗口的最大值数。

现在我们正在探索 3 个选项:

  1. 较小的 Windows - 我们认为我们可能仍然会遇到问题,因为事件可能会及时聚集在一起。
  2. 在每个键中添加一个随机值来对键进行分区 - 这也不理想,因为当我们有较少的事件进入时,每个键的值就会太少。当事件数量呈指数增长时,我们也无法调整分区数量。
  3. 一些花哨的触发或使用组合器 - 可能是最好的解决方案,但不知道如何做到这一点。

是否有这样做的标准方法或最佳实践?

0 投票
2 回答
562 浏览

google-cloud-dataflow - 在 Cloud Dataflow 中使用 Beam SDK

我们目前正在使用 Google 的 Cloud Dataflow SDK (1.6.0) 在 GCP 中运行数据流作业,但是,我们正在考虑迁移到 Apache Beam SDK (0.1.0)。我们仍将使用数据流服务在 GCP 中运行我们的作业。有没有人经历过这种转变并提出建议?这里是否存在任何兼容性问题,GCP 是否鼓励这一举措?

0 投票
1 回答
283 浏览

google-cloud-platform - 数据流显示复合转换中缺少的数据

我正在尝试 Dataflow 中的新显示数据功能,以便在 Google Cloud Dataflow UI 中显示更多详细信息。但是, custom 的显示数据PTransform不显示。在我的数据流管道中,我进行了如下转换:

当我运行 Dataflow 作业时,UI 似乎没有显示foo=bar显示数据。

0 投票
1 回答
548 浏览

python - 使用最新的 python apache_beam cloud datafow sdk 创建用于从云数据存储读取的自定义源

最近云数据流 python sdk 可用,我决定使用它。不幸的是,还没有支持从云数据存储中读取数据,所以我不得不依赖编写自定义源代码,以便我可以按照承诺利用动态拆分、进度估计等的好处。我确实彻底研究了文档,但无法将各个部分放在一起,以便我可以加快整个过程。

更清楚地说,我的第一种方法是:

  1. 查询云数据存储
  2. 创建 ParDo 函数并将返回的查询传递给它。

但是这样一来,迭代超过 20 万个条目需要 13 分钟。

所以我决定编写可以有效读取实体的自定义源。但是由于我对将各个部分组合在一起缺乏了解,我无法做到这一点。任何人都可以帮助我如何创建自定义源以从数据存储中读取。

编辑:对于第一种方法,我的要点的链接是: https ://gist.github.com/shriyanka/cbf30bbfbf277deed4bac0c526cf01f1

谢谢你。

0 投票
1 回答
768 浏览

google-cloud-dataflow - CoGroupByKey 如何处理discardingFiredPanes?

即,如果我有一个带有触发器的 GlobalWindow,该触发器在每个 pcollection 上的每个新元素上触发,设置为 discardingFiredPanes,当 rhs 引起火灾时,CoGroupByKey 是否会以 lhs null 触发,或者使用 lhs 的最后一个值触发?

鉴于这两个数据集

p1: |id|x1| |1 |10| |1 |11| |1 |12| p2: |id|x2| |1 |20| |1 |21| |1 |22|

我应该期待:

|id| x1 | x2 | |1 |[10]|null| |1 |null|[20]| |1 |[11]|null| |1 |null|[21]| |1 |[12]|null| |1 |null|[22]|

或者:

|id| x1 | x2 | |1 |[10]|null| |1 |[10]|[20]| |1 |[11]|[20]| |1 |[11]|[21]| |1 |[12]|[21]| |1 |[12]|[22]|

0 投票
3 回答
3867 浏览

python - 在 python Apache Beam 中打开一个 gzip 文件

目前是否可以使用 Apache Beam 从 python 中的 gzip 文件中读取?我的管道使用这行代码从 gcs 中提取 gzip 文件:

但我收到此错误:

我们注意到在 python 束源代码中,压缩文件似乎在写入接收器时被处理。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

更详细的追溯: