问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
711 浏览

google-cloud-platform - 将 MutationGroup 流式传输到 Spanner

我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到扳手中。目标是每 10 秒编写一次新的 MuationGroup,因为我们将使用 spanner 来查询近期 KPI。

当我不使用任何窗口时,我收到以下错误:

由于上面的错误,我假设输入集合需要窗口化和触发,因为 SpannerIO 使用 GroupByKey(这也是我的用例所需要的):

当我这样做时,我在运行时收到以下异常:

经过进一步调查,这似乎是由于.apply(Wait.on(input))在 SpannerIO 中:它有一个全局侧输入,似乎不适用于我的固定窗口,如Wait.java状态文档:

作为临时解决方法,我尝试了以下方法:

  • 添加带有触发器而不是固定窗口的 GlobalWindow:

    这导致只有在我耗尽管道时才会写入扳手。我的印象是该Wait.on()信号仅在全局窗口关闭时触发,并且不适用于触发器。

  • .apply(Wait.on(input))在 SpannerIO 中禁用:

    这导致管道卡在此 SO 帖子中描述的视图创建上: SpannerIO Dataflow 2.3.0 卡在 CreateDataflowView 中

    当我检查工作人员日志以获取线索时,我确实收到以下警告:

    /li>

请注意,一切都适用于 DirectRunner,我正在尝试使用 DataflowRunner。

有没有人对我可以尝试让它运行的事情有任何其他建议?我很难想象我是唯一一个试图将 MutationGroups 流式传输到扳手的人。

提前致谢!

0 投票
1 回答
349 浏览

google-cloud-platform - 如何在云数据流 python 管道中读取多种数据存储类型

我正在尝试从我的 python 管道中的默认命名空间读取多个数据存储类型,并希望对其进行处理。我编写的函数在本地使用 DirectRunner 运行良好,但是当我使用 DataflowRunner 在云上运行管道时,其中一种(包含 1500 条记录)读取速度非常快,而另一种(包含数百万条记录)读取速度非常慢.

作为参考,当我试图在管道中读取一种(包含数百万条记录)时,它花了 10 分钟,但是当它们一起执行时,它花了将近 1 个小时,但它仍然只处理了 1/10 的记录。

我无法弄清楚问题所在。

这是我的代码

让我知道是否有人知道为什么会发生这种情况。

0 投票
1 回答
766 浏览

python - 在 python 中使用 BigQuery 接收器流式传输管道

我正在构建一个 apache 束流管道,其源是 Pubsub,接收器是 BigQuery。我收到了错误消息:

“工作流程失败。原因:未知的消息代码。”

尽管这条消息很神秘,但我现在认为 BigQuery 不支持作为流式管道的接收器,它在这里说: Streaming from Pub/Sub to BigQuery

我肯定是正确的,这是导致问题的原因吗?或者,如果不是,它在任何情况下仍然不受支持?

谁能暗示这个功能什么时候发布?很遗憾,我很高兴能使用它。

0 投票
1 回答
113 浏览

python - 来自端口的 Apache Beam 输入

[Python - 光束 SDK]

我希望能够在集成测试中测试时序问题,所以我想构建一个生成器系统,将消息通过我指定的时间戳传递到我的 Beam 应用程序中。我目前的想法是将应用程序写入端口(我想监听的每个流一个),然后让 Beam 应用程序监听这些端口(与监听 Kafka 主题的方式相同)。

Beam 是否已经能够将端口作为源侦听?如果没有,最好的方法是什么?

0 投票
1 回答
4463 浏览

apache-beam - 如何在 Apache Beam 2.6 中使用 FileIO.writeDynamic() 写入多个输出路径?

我正在使用 Apache Beam 2.6 读取单个 Kafka 主题并将输出写入 Google Cloud Storage (GCS)。现在我想改变管道,以便它读取多个主题并将它们写成gs://bucket/topic/...

TextIO仅阅读我在管道的最后一步中使用的单个主题时:

是一个类似的问题,我试图调整哪些代码。

官方 JavaDoc包含的示例代码似乎具有过时的方法签名。(该.via方法似乎改变了参数的顺序)。我还偶然发现了一个让我感到困惑的例子——FileIO不应该在这条线上改变位置吗? TransactionTypeTransaction

0 投票
1 回答
272 浏览

java - 使用 DataFlow 从多个 PubSub 主题流式传输到 BigQuery 时消息卡在 GBP 中?

我有一个包含以下部分的 Java DataFlow 管道:

  • PubSub 订阅者阅读多个主题
  • Flatten.pCollections 操作
  • 从 PubsubMessage 转换为 TableRow
  • BigQuery 编写器将所有内容写入动态表

当要连接的订阅列表中有多个 PubSub-topic 时,所有元素都会卡在 BigQuery 编写器中 Reshuffle 操作中的 GroupByKey 操作中。在发送了几十条测试消息后,我让它运行了几个小时,但没有向 BigQuery 写入任何内容。

我发现了以下三种解决方法(每种方法都与其他方法分开工作)

  • 在 Pubsub 订阅上添加“withTimestampAttribute”调用。属性的名称根本无关紧要 - 它可以是传入消息中任何现有或不存在的属性
  • 将 PubSub 订阅数量减少到只有 1 个
  • 删除其间的 Flatten.pCollections 操作,创建多个单独的管道做完全相同的事情

消息没有刻意添加时间戳 - 仅使用 PubsubMessage 时间戳将它们写入 BigQuery 是完全可以接受的。

这也让我感到困惑,即使添加一个不存在的时间戳属性似乎也能解决问题。我调试了这个问题以打印出管道中的时间戳,它们在两种情况下都是可比的;当指定一个不存在的时间戳属性时,它似乎无论如何都会退回到 pubsub 时间戳。

什么可能导致此问题?我该如何解决?对我来说,最可接受的解决方法是删除 Flatten.pCollections 操作,因为它不会严格地使代码复杂化,但我无法理解它失败的原因。

0 投票
0 回答
591 浏览

python - 尝试实现 mongoDB IO 连接器接收器时出错

我们开始为第一个 Apache Beam 测试实现一个 mongoDB IO 连接器,并且 Source 部分似乎工作正常。关于 Sink 部分,执行会导致错误...我们使用此指南进行实施:https ://beam.apache.org/documentation/sdks/python-custom-io/

我们的实现可以在这里找到:https ://github.com/PEAT-AI/beam-extended

我尝试运行的示例如下:

我得到以下堆栈跟踪:

0 投票
0 回答
498 浏览

google-cloud-storage - 如何使 ReadAllFromText 不阻塞光束管道?

我想实现一个非常简单的光束管道:

Apache Beam 为每个进程预先实现了 PTransform。

所以管道将是:

但是,ReadAllFromText() 以某种方式阻塞了管道。创建自定义 PTransform 在从 PubSub 读取并将其写入 BigQuery 表后返回随机行正常工作(无阻塞)。添加 3 秒的固定窗口或触发每个元素也不能解决问题。

每个文件大约 10MB 和 23K 行。

不幸的是,我找不到有关ReadAllFromText应该如何工作的文档。如果它试图阻止管道直到读取所有文件,那将是非常奇怪的。而且我希望该函数在读取该行后立即将每行推送到管道。

上述行为是否有任何已知原因?这是一个错误还是我做错了什么?

管道代码:

0 投票
1 回答
184 浏览

google-bigquery - 使用连接从 BigQuery 读取批量数据

我有一个用例,其中我必须通过在 20 个不同的 BQ 表上应用左连接来从 BigQuery 读取选定的数据,对该数据应用转换,然后最终转储到最终的 BQ 表中。

我有两种方法来实现这一点,我尝试了样本数据(来自 20 个表的总共 10-1200 万行)并得到了如下结果:

方法 1 - 为整个过程编写一个 BQ 兼容查询并通过 Apache Beam 触发相同的查询(使用 BigQueryIO.readTableRows()),最后将数据转储到目标 BigQuery 表。

  • 使用 n1-standard-4 机器类型运行作业,作业在 6:24 分钟内完成。

方法 2 - 在 Apache Beam 中编写整个过程,避免在 BQ 中使用大量插槽。

  • 使用 n1-standard-4 机器类型运行作业,作业在 14:50 分钟内完成。

这里要考虑的重要一点是,随着时间的推移,要加入的 20 个源表将继续增长,并且可能在不久的将来我们将拥有 TB 的数据。

考虑到我们未来的数据增长,哪种方法更有效?

0 投票
2 回答
568 浏览

google-cloud-dataflow - Google Dataflow - 如果写入本地服务器,如何在 java 中指定 TextIO?

Google Dataflow - 如果从 Dataflow 写入本地服务器,如何指定 TextIO?(前提是 On-prem 服务器通过 Cloud VPN 连接到 GCP)

在运行 Dataflow 作业时,是否像上面那样提供本地 IP 和目录?我尝试了,并且在步骤摘要中添加了元素,作业成功完成,但我没有看到本地服务器中写入的任何文件。[不确定它是否与本地服务器的身份验证有关]