问题标签 [apache-beam]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - 未正确分组的项目 - CoGroupByKey
CoGroupByKey 问题
资料说明。
我有两个数据集。
- 记录- 第一个,每个
(key,day)
. 对于测试,我使用 2-3 个密钥和 5-10 天的数据。我的目标是 1000+ 键。每条记录都包含键、时间戳(以微秒为单位)和一些其他数据。 - 配置- 第二个,相当小。它描述了时间的关键,例如,您可以将其视为元组列表:
(key, start date, end date, description)
。
为了探索,我将数据编码为以长度为前缀的协议缓冲区二进制编码消息的文件。此外,这些文件使用 gzip 打包。数据按日期分片。每个文件大约 10MB。
管道
我使用 Apache Beam 来表达管道。
- 首先,我向两个数据集添加键。对于 Records 数据集,它是
(key, day rounded timestamp)
. 对于 Configs,key 是,其中 day 是和(key, day)
之间的每个时间戳值(指向午夜)。start date
end date
- 使用 CoGroupByKey 合并数据集。
作为我使用org.apache.flink.api.java.tuple.Tuple2
来自Tuple2Coder
repo github.com/orian/tuple-coder的密钥类型。
问题
如果 Records 数据集很小,比如 5 天,那么一切似乎都很好(检查 normal_run.log)。
当我运行管道超过 10 天时,我遇到一个错误,指出某些记录没有配置 (wrong_run.log)。
然后我添加了一些额外的日志消息:
您可以发现在第一行中,针对 KeyValue3 和时间 1462665600000000 处理了 68643 个项目。
稍后在第 9 行中,该操作似乎再次处理了相同的键,但它报告这些记录没有可用的配置。
第 10 行通知它们已被标记为 no-loc。
第 2 行表示 KeyValue3 和时间 1463184000000000 没有项目,但在第 11 行中,您可以读到此 (key,day) 对的项目是稍后处理的,并且它们缺少配置。
一些线索
在其中一次探索运行期间,我遇到了一个异常(exception_throw.log)。
解决方法(经过更多测试,不起作用,继续使用 Tuple2)
我已经从使用 Tuple2 切换到了 Protocol Buffer 消息:
但是使用Tuple2.of()
起来比:KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build()
.
当切换到一个键是从 protobuf.Message 派生的类时,问题消失了 10-15 天(因此数据大小是 Tuple2 的问题),但是将数据大小增加到 20 天表明它就在那里。
java - 使用 Apache Beam 作为依赖
我正在尝试将 Apache Beam 用作 Maven 或 Gradle 依赖项。据我所知,该项目正在孵化,旨在成为 Google Dataflow SDK 的演进版本。
所以,我可能做了一个错误的假设:因为我没有在 Apache Beam 项目的初始文档中找到,我如何使用他们的 Java sdk 版本作为我项目的 Gradle 或 Maven 依赖项?我还没有找到要添加的确切 artifactid、groupid 是什么。这里有人用过吗?
java - 如何在谷歌数据流中创建个性化的 WindowFn
我想以WindowFn
这种方式创建一个不同的窗口,以基于另一个字段而不是基于我的输入条目的时间戳将 Windows 分配给我的任何输入元素。我知道来自 Google DataFlow SDK 的预定义WindowFn
使用时间戳作为分配窗口的标准。
更具体地说,我想创建一种SlidingWindows
但不是将时间戳视为窗口分配标准,而是将另一个字段视为该标准。
我怎样才能创建我的自定义WindowFn
?创建自己的 时应该考虑哪些要点WindowFn
?
谢谢。
google-cloud-dataflow - 数据流进程未在失败时恢复
在最近发生整个 AZ 因中断而丢失的事件之后,我想更好地了解 Dataflow 故障转移过程。
当我手动删除数据流作业(流式传输、PubSub 到 BigQuery)的工作节点时,它们已成功重新创建/重新启动,但数据流进程本身尚未恢复。
即使所有状态都正常,但数据项并没有流动。
重新启动流程的唯一方法是取消作业并重新提交。
即使我知道手动删除不是一个有效的测试,我们也不能忽视人为错误的因素。
我的理解是工作流应该已经自动重新启动,但这里没有观察到这种情况。
我想念什么?
google-cloud-dataflow - 限制每个键的值数量
目前我们有一个数据流过程,GroupByKey
但是DoPar
在 group-by 之后每个键获得了太多的值,我们想知道是否有一个好的解决方案。据我所知,没有办法设置每个窗口的最大值数。
现在我们正在探索 3 个选项:
- 较小的 Windows - 我们认为我们可能仍然会遇到问题,因为事件可能会及时聚集在一起。
- 在每个键中添加一个随机值来对键进行分区 - 这也不理想,因为当我们有较少的事件进入时,每个键的值就会太少。当事件数量呈指数增长时,我们也无法调整分区数量。
- 一些花哨的触发或使用组合器 - 可能是最好的解决方案,但不知道如何做到这一点。
是否有这样做的标准方法或最佳实践?
google-cloud-dataflow - 在 Cloud Dataflow 中使用 Beam SDK
我们目前正在使用 Google 的 Cloud Dataflow SDK (1.6.0) 在 GCP 中运行数据流作业,但是,我们正在考虑迁移到 Apache Beam SDK (0.1.0)。我们仍将使用数据流服务在 GCP 中运行我们的作业。有没有人经历过这种转变并提出建议?这里是否存在任何兼容性问题,GCP 是否鼓励这一举措?
google-cloud-platform - 数据流显示复合转换中缺少的数据
我正在尝试 Dataflow 中的新显示数据功能,以便在 Google Cloud Dataflow UI 中显示更多详细信息。但是, custom 的显示数据PTransform
不显示。在我的数据流管道中,我进行了如下转换:
当我运行 Dataflow 作业时,UI 似乎没有显示foo=bar
显示数据。
python - 使用最新的 python apache_beam cloud datafow sdk 创建用于从云数据存储读取的自定义源
最近云数据流 python sdk 可用,我决定使用它。不幸的是,还没有支持从云数据存储中读取数据,所以我不得不依赖编写自定义源代码,以便我可以按照承诺利用动态拆分、进度估计等的好处。我确实彻底研究了文档,但无法将各个部分放在一起,以便我可以加快整个过程。
更清楚地说,我的第一种方法是:
- 查询云数据存储
- 创建 ParDo 函数并将返回的查询传递给它。
但是这样一来,迭代超过 20 万个条目需要 13 分钟。
所以我决定编写可以有效读取实体的自定义源。但是由于我对将各个部分组合在一起缺乏了解,我无法做到这一点。任何人都可以帮助我如何创建自定义源以从数据存储中读取。
编辑:对于第一种方法,我的要点的链接是: https ://gist.github.com/shriyanka/cbf30bbfbf277deed4bac0c526cf01f1
谢谢你。
google-cloud-dataflow - CoGroupByKey 如何处理discardingFiredPanes?
即,如果我有一个带有触发器的 GlobalWindow,该触发器在每个 pcollection 上的每个新元素上触发,设置为 discardingFiredPanes,当 rhs 引起火灾时,CoGroupByKey 是否会以 lhs null 触发,或者使用 lhs 的最后一个值触发?
鉴于这两个数据集
p1:
|id|x1|
|1 |10|
|1 |11|
|1 |12|
p2:
|id|x2|
|1 |20|
|1 |21|
|1 |22|
我应该期待:
|id| x1 | x2 |
|1 |[10]|null|
|1 |null|[20]|
|1 |[11]|null|
|1 |null|[21]|
|1 |[12]|null|
|1 |null|[22]|
或者:
|id| x1 | x2 |
|1 |[10]|null|
|1 |[10]|[20]|
|1 |[11]|[20]|
|1 |[11]|[21]|
|1 |[12]|[21]|
|1 |[12]|[22]|
python - 在 python Apache Beam 中打开一个 gzip 文件
目前是否可以使用 Apache Beam 从 python 中的 gzip 文件中读取?我的管道使用这行代码从 gcs 中提取 gzip 文件:
但我收到此错误:
我们注意到在 python 束源代码中,压缩文件似乎在写入接收器时被处理。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445
更详细的追溯: