问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1457 浏览

java - 在 Apache Beam 中使用 defaultNaming 进行动态窗口写入

我正在关注这篇文章文档的答案,以便在管道末端对我的数据执行动态窗口写入。这是我到目前为止所拥有的:

但是 NetBeans 在最后一行警告我一个语法错误:

FileNaming is not public in Write; cannot be accessed outside package

如何使defaultNaming我的管道可用,以便我可以将其用于动态写入。或者,如果那不可能,我应该怎么做?

0 投票
1 回答
1458 浏览

google-cloud-platform - 在 apache 光束流中以固定间隔触发

我正在使用 apache 梁来编写一些流式传输管道。我的用例的一个要求是我想相对于窗口开始或结束时间每 X 分钟触发一次。我怎么能做到这一点。当前触发器 AfterProcessingTime.pastFirstElementInPane() 与该窗口中第一个元素的处理时间有关。

例如,我创建了固定的 1 分钟窗口,所以我有 window_1(0-1 分钟间隔)、window_2(1 - 2 分钟间隔)等等。现在我希望每个窗口的结果在窗口开始后 10 分钟后恰好触发一次,即 window_1 在 0 + 10 -> 10 分钟,window_2 在第 11 分钟 (1 + 10)。[注意:我将固定窗口配置为允许延迟 > 10 分钟,因此如果延迟,元素不会被丢弃]

有没有办法为固定窗口实现这种触发。

我不能只是将所有元素分配给一个全局窗口,然后每分钟重复触发一次,因为这样它会丢失所有元素窗口时间信息。例如,如果我的 pcollection 中有 2 个元素属于 window_1 和 window_2 基于那里的事件时间戳,但延迟了 3 和 3.2 分钟。将它们分配给全局窗口将在第 4 分钟结束时生成一些输出,同时考虑到这两个元素,而实际上我希望将它们分配给实际的固定窗口(作为后期数据)。

我希望根据那里的事件时间戳将元素分配给 window_1 和 window_2,然后 window_1 在第 10 分钟触发输出结果,方法是仅处理该窗口的 1 个延迟数据,然后在第 11 分钟触发 window_2 并在处理唯一的元素后输出延迟了 3.2 分钟。在我的流媒体管道中实现这种行为的触发器设置应该是什么。

0 投票
0 回答
105 浏览

apache-flink - 在写入 DB 时控制 ParDo Transform 中的并行性

我目前正在开发一个使用 Apache Beam 和 Flink 作为执行引擎的管道。作为该过程的一部分,我从 Kafka 读取数据并执行一系列转换,包括连接、聚合以及对外部数据库的查找。

这个想法是我们希望在执行聚合时与 Flink 具有更高的并行性,但最终合并数据并减少写入 DB 的进程数量,以便目标 DB 可以处理它(例如说我想要一个聚合的并行度为 40,但写入目标数据库时只有 10)。

我们有什么办法可以在 Beam 中做到这一点吗?

0 投票
2 回答
3977 浏览

java - 使用 TextIO 和 ValueProvider 创建数据流模板时出错

我正在尝试创建一个谷歌数据流模板,但我似乎无法找到一种方法来做到这一点而不会产生以下异常:

我可以使用 Beam 的 MinimalWordCount 示例的简单修改版本来重现它。

我可以在本地运行示例:

它还在 Google Dataflow 上运行:

但是,当我尝试使用以下内容创建 Google Dataflow 模板时,出现错误:

另一个令人困惑的事情是 maven 构建继续并以 BUILD SUCCESS 结束

所以我的问题是:

Q1)我应该能够创建这样的 Google Dataflow 模板(使用 ValueProviders 在运行时提供 TextIO 输入)吗?

Q2)构建过程中的异常是真正的错误还是日志似乎表明的只是一个警告?

Q3)如果 Q1 和 Q2 的答案是肯定的并且“只是一个警告”,并且我尝试从上传的模板创建作业,为什么它没有任何元数据或不知道我的输入选项?

谷歌数据流截图

我用过的参考资料:

0 投票
0 回答
910 浏览

java - 在为 python apache beam sdk 创建 cloudsql/mysql 的数据源时在 apache beam 中设置环境变量

您好我正在尝试在 python 中为 apache Beam 创建一个数据源。我知道使用 Java,您可以使用 JDBC 库连接到 cloudsql。同样,我正在尝试在 Google Cloud Platform 中创建数据流(apache 光束)的源。我继承了有界源类并使用了 jaydebeapi 库(jdbc 库的 python 包装器)连接到 mysql 数据库。请看下面的代码。

对于 .jar 文件驱动程序,我已将其存储在临时文件位置的谷歌云存储中。但是,python 需要 Java Developer Kit 来运行 java 代码,并且在我的计算机上本地运行时,我可以设置 JAVA_HOME 变量并指向我本地计算机上的 /bin 位置。

但是,当我在数据流中运行它时,我收到一个错误“没有找到 JVM 共享库文件。尝试正确设置 JAVA_HOME 环境变量。这是因为在数据流中我无法安装 Java 开发工具包 (JDK) 或创建环境变量。

有没有办法在数据流上安装 JDK 并引用环境变量。还有关于如何运行 python apache Beam 作业以这种方式从云 sql 数据库中提取数据的任何想法?

0 投票
1 回答
346 浏览

google-cloud-dataflow - 数据流:如何从另一个管道喷出的现有 PCollection 创建管道

我正在尝试将我的管道拆分为许多较小的管道,以便它们执行得更快。我正在对谷歌云存储 blob (PCollection) 的 PCollection 进行分区,以便得到一个

从那里我希望能够做到:

但是我还没有找到任何关于从已经存在的 PCollection 创建初始 PCollection 的文档,如果有人能指出正确的方向,我将不胜感激。谢谢!

0 投票
1 回答
1655 浏览

apache-beam - 执行 Apache BEAM sql 查询时出错 - 在 GroupByKey 之前使用 Window.into 或 Window.triggering 转换

如何在 BEAM SQL 中的 GroupByKey 之前包含 Window.into 或 Window.triggering 转换?

我有以下 2 个表:

第二张桌子

我正在将结果写在第三张表中

前 2 个表正在从 kafka 流中读取数据,我正在对这些表进行连接并将数据插入到第三个表中,使用以下查询。前 2 个表是无界/无界的

我收到以下错误:

原因:java.lang.IllegalStateException:GroupByKey 不能在没有触发器的情况下应用于 GlobalWindow 中的无界 PCollection。在 GroupByKey 之前使用 Window.into 或 Window.triggering 转换。在 org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173) 在 org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204) 在 org.apache.beam.sdk .transforms.GroupByKey.expand(GroupByKey.java:120) 在 org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) 在 org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472 ) 在 org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:126) 上的 org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)。 beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74) 在 org.

0 投票
1 回答
619 浏览

google-bigquery - Scio saveAsTypedBigQuery 写入分区,用于 SCollection of Typed Big Query 案例类

我正在尝试使用以下方法将 SCollection 写入 Big Query 中的分区:

我得到的错误是 表 ID 必须是字母数字(加上下划线),并且必须最多 1024 个字符长。此外,不能使用表装饰器。”

如何写入分区?我没有看到任何通过 saveAsTypedBigQuery 方法指定分区的选项,所以我尝试了 Legacy SQL 表装饰器。

0 投票
2 回答
485 浏览

google-cloud-dataflow - Scio所有saveAs txt文件方法输出一个带有part前缀的txt文件

如果我想将 TableRow 或 String 的 SCollection 输出到谷歌云存储 (GCS),我分别使用 saveAsTableRowJsonFile 或 saveAsTextFile。这两种方法最终都使用

它强制文件名以“part”开头。是通过使用 saveAsCustomOutput 输出自定义分片文件的唯一方法吗?

0 投票
1 回答
269 浏览

python-2.7 - 使用 Dataflow 发布到 Bigquery

将消息从 pubsubio 插入 BigQuery 时出现错误提示。

如何将记录从 pubsub 插入 BQ。我们可以转换pcollection成一个列表,还是有另一个替代品?

AttributeError:'PCollection'对象没有属性'split'

这是我的代码: