问题标签 [apache-beam-io]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-platform - 将 MutationGroup 流式传输到 Spanner
我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到扳手中。目标是每 10 秒编写一次新的 MuationGroup,因为我们将使用 spanner 来查询近期 KPI。
当我不使用任何窗口时,我收到以下错误:
由于上面的错误,我假设输入集合需要窗口化和触发,因为 SpannerIO 使用 GroupByKey(这也是我的用例所需要的):
当我这样做时,我在运行时收到以下异常:
经过进一步调查,这似乎是由于.apply(Wait.on(input))
在 SpannerIO 中:它有一个全局侧输入,似乎不适用于我的固定窗口,如Wait.java
状态文档:
作为临时解决方法,我尝试了以下方法:
添加带有触发器而不是固定窗口的 GlobalWindow:
这导致只有在我耗尽管道时才会写入扳手。我的印象是该
Wait.on()
信号仅在全局窗口关闭时触发,并且不适用于触发器。.apply(Wait.on(input))
在 SpannerIO 中禁用:这导致管道卡在此 SO 帖子中描述的视图创建上: SpannerIO Dataflow 2.3.0 卡在 CreateDataflowView 中。
当我检查工作人员日志以获取线索时,我确实收到以下警告:
/li>
请注意,一切都适用于 DirectRunner,我正在尝试使用 DataflowRunner。
有没有人对我可以尝试让它运行的事情有任何其他建议?我很难想象我是唯一一个试图将 MutationGroups 流式传输到扳手的人。
提前致谢!
google-cloud-platform - 如何在云数据流 python 管道中读取多种数据存储类型
我正在尝试从我的 python 管道中的默认命名空间读取多个数据存储类型,并希望对其进行处理。我编写的函数在本地使用 DirectRunner 运行良好,但是当我使用 DataflowRunner 在云上运行管道时,其中一种(包含 1500 条记录)读取速度非常快,而另一种(包含数百万条记录)读取速度非常慢.
作为参考,当我试图在管道中读取一种(包含数百万条记录)时,它花了 10 分钟,但是当它们一起执行时,它花了将近 1 个小时,但它仍然只处理了 1/10 的记录。
我无法弄清楚问题所在。
这是我的代码
让我知道是否有人知道为什么会发生这种情况。
python - 在 python 中使用 BigQuery 接收器流式传输管道
我正在构建一个 apache 束流管道,其源是 Pubsub,接收器是 BigQuery。我收到了错误消息:
“工作流程失败。原因:未知的消息代码。”
尽管这条消息很神秘,但我现在认为 BigQuery 不支持作为流式管道的接收器,它在这里说: Streaming from Pub/Sub to BigQuery
我肯定是正确的,这是导致问题的原因吗?或者,如果不是,它在任何情况下仍然不受支持?
谁能暗示这个功能什么时候发布?很遗憾,我很高兴能使用它。
python - 来自端口的 Apache Beam 输入
[Python - 光束 SDK]
我希望能够在集成测试中测试时序问题,所以我想构建一个生成器系统,将消息通过我指定的时间戳传递到我的 Beam 应用程序中。我目前的想法是将应用程序写入端口(我想监听的每个流一个),然后让 Beam 应用程序监听这些端口(与监听 Kafka 主题的方式相同)。
Beam 是否已经能够将端口作为源侦听?如果没有,最好的方法是什么?
apache-beam - 如何在 Apache Beam 2.6 中使用 FileIO.writeDynamic() 写入多个输出路径?
我正在使用 Apache Beam 2.6 读取单个 Kafka 主题并将输出写入 Google Cloud Storage (GCS)。现在我想改变管道,以便它读取多个主题并将它们写成gs://bucket/topic/...
TextIO
仅阅读我在管道的最后一步中使用的单个主题时:
这是一个类似的问题,我试图调整哪些代码。
官方 JavaDoc包含的示例代码似乎具有过时的方法签名。(该.via
方法似乎改变了参数的顺序)。我还偶然发现了一个让我感到困惑的例子——FileIO
不应该在这条线上改变位置吗? TransactionType
Transaction
java - 使用 DataFlow 从多个 PubSub 主题流式传输到 BigQuery 时消息卡在 GBP 中?
我有一个包含以下部分的 Java DataFlow 管道:
- PubSub 订阅者阅读多个主题
- Flatten.pCollections 操作
- 从 PubsubMessage 转换为 TableRow
- BigQuery 编写器将所有内容写入动态表
当要连接的订阅列表中有多个 PubSub-topic 时,所有元素都会卡在 BigQuery 编写器中 Reshuffle 操作中的 GroupByKey 操作中。在发送了几十条测试消息后,我让它运行了几个小时,但没有向 BigQuery 写入任何内容。
我发现了以下三种解决方法(每种方法都与其他方法分开工作)
- 在 Pubsub 订阅上添加“withTimestampAttribute”调用。属性的名称根本无关紧要 - 它可以是传入消息中任何现有或不存在的属性
- 将 PubSub 订阅数量减少到只有 1 个
- 删除其间的 Flatten.pCollections 操作,创建多个单独的管道做完全相同的事情
消息没有刻意添加时间戳 - 仅使用 PubsubMessage 时间戳将它们写入 BigQuery 是完全可以接受的。
这也让我感到困惑,即使添加一个不存在的时间戳属性似乎也能解决问题。我调试了这个问题以打印出管道中的时间戳,它们在两种情况下都是可比的;当指定一个不存在的时间戳属性时,它似乎无论如何都会退回到 pubsub 时间戳。
什么可能导致此问题?我该如何解决?对我来说,最可接受的解决方法是删除 Flatten.pCollections 操作,因为它不会严格地使代码复杂化,但我无法理解它失败的原因。
python - 尝试实现 mongoDB IO 连接器接收器时出错
我们开始为第一个 Apache Beam 测试实现一个 mongoDB IO 连接器,并且 Source 部分似乎工作正常。关于 Sink 部分,执行会导致错误...我们使用此指南进行实施:https ://beam.apache.org/documentation/sdks/python-custom-io/
我们的实现可以在这里找到:https ://github.com/PEAT-AI/beam-extended
我尝试运行的示例如下:
我得到以下堆栈跟踪:
google-cloud-storage - 如何使 ReadAllFromText 不阻塞光束管道?
我想实现一个非常简单的光束管道:
Apache Beam 为每个进程预先实现了 PTransform。
所以管道将是:
但是,ReadAllFromText
() 以某种方式阻塞了管道。创建自定义 PTransform 在从 PubSub 读取并将其写入 BigQuery 表后返回随机行正常工作(无阻塞)。添加 3 秒的固定窗口或触发每个元素也不能解决问题。
每个文件大约 10MB 和 23K 行。
不幸的是,我找不到有关ReadAllFromText
应该如何工作的文档。如果它试图阻止管道直到读取所有文件,那将是非常奇怪的。而且我希望该函数在读取该行后立即将每行推送到管道。
上述行为是否有任何已知原因?这是一个错误还是我做错了什么?
管道代码:
google-bigquery - 使用连接从 BigQuery 读取批量数据
我有一个用例,其中我必须通过在 20 个不同的 BQ 表上应用左连接来从 BigQuery 读取选定的数据,对该数据应用转换,然后最终转储到最终的 BQ 表中。
我有两种方法来实现这一点,我尝试了样本数据(来自 20 个表的总共 10-1200 万行)并得到了如下结果:
方法 1 - 为整个过程编写一个 BQ 兼容查询并通过 Apache Beam 触发相同的查询(使用 BigQueryIO.readTableRows()),最后将数据转储到目标 BigQuery 表。
- 使用 n1-standard-4 机器类型运行作业,作业在 6:24 分钟内完成。
方法 2 - 在 Apache Beam 中编写整个过程,避免在 BQ 中使用大量插槽。
- 使用 n1-standard-4 机器类型运行作业,作业在 14:50 分钟内完成。
这里要考虑的重要一点是,随着时间的推移,要加入的 20 个源表将继续增长,并且可能在不久的将来我们将拥有 TB 的数据。
考虑到我们未来的数据增长,哪种方法更有效?
google-cloud-dataflow - Google Dataflow - 如果写入本地服务器,如何在 java 中指定 TextIO?
Google Dataflow - 如果从 Dataflow 写入本地服务器,如何指定 TextIO?(前提是 On-prem 服务器通过 Cloud VPN 连接到 GCP)
在运行 Dataflow 作业时,是否像上面那样提供本地 IP 和目录?我尝试了,并且在步骤摘要中添加了元素,作业成功完成,但我没有看到本地服务器中写入的任何文件。[不确定它是否与本地服务器的身份验证有关]