问题标签 [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
962 浏览

python - 在 python apache Beam 中,是否可以按特定顺序编写元素?

我正在使用梁在重叠窗口上处理时间序列数据。在我的管道结束时,我将每个元素写入一个文件。每个元素代表一个 csv 行,其中一个字段是关联窗口的时间戳。我想按该时间戳的顺序编写元素。有没有办法使用 python 梁库来做到这一点?

0 投票
0 回答
519 浏览

java - 带有 Flink 后端的 Apache Beam 在调用 protobuf-java 库方法时抛出 NoSuchMethodError

我正在尝试使用协议缓冲区在本地集群上运行一个简单的管道,以在 Beam 函数之间传递数据。包含在com.google.protobuf:protobuf-javaFatJar 中。

如果我运行它,一切正常:

但是尝试在 flink 集群上运行时失败:

准备运行项目: https ://github.com/orian/beam-flink-local-cluster

Beam 版本是 0.3-Snapshot(出血边缘),它使用 Flink 1.0.3 版本,我的本地集群运行1.0.3版本。Flink 使用 protobuf-java 2.5.0。


该程序完成,但出现以下异常:

0 投票
1 回答
358 浏览

nullpointerexception - 使用 Apache Beam 的数据流 sdk 写入 BigTable 时捕获 NullPointerException

我正在使用Apache's Beamsdk 版本并尝试使用runner0.2.0-incubating-SNAPSHOT 将数据拉到一个 bigtable 中。Dataflow不幸的是,我NullPointerException在执行用作接收器的数据流管道时遇到了问题BigTableIO.WriteBigtableOptions根据我的需要,已经检查了我的参数并没有问题。

基本上,我创建并在我的管道的某个点上,我有步骤将其写入PCollection<KV<ByteString, Iterable<Mutation>>>我想要的大表:

在执行管道时,我得到了NullPointerException,在方法中准确地指出了 BigtableIO 类public void processElement(ProcessContext c)

我检查了此方法是否在处理所有元素之前在 bigtable 上写入,但不确定为什么我会超时执行此管道。根据下面的代码,此方法使用bigtableWriter属性来处理每个c.element(),但我什至无法设置断点来调试null. 有关如何解决此问题的任何建议或建议?

谢谢。

0 投票
1 回答
897 浏览

python - 有没有办法读取除python apache beam中定义的文件列表之外的所有文件?

我的用例是我在一个不断更新新文件的存储桶中批处理文件。我不想处理已经处理过的 csv 文件。

有没有办法做到这一点?

我想到的一个可能的解决方案是有一个文本文件来维护已处理文件的列表,然后读取除已处理列表中的文件之外的所有 csv 文件。那可能吗?

或者是否可以读取特定文件的列表?

0 投票
2 回答
86 浏览

python - SlidingWindows 可以在 python apache Beam 中有半秒的时间吗?

SlidingWindows 似乎让我的月经周期四舍五入。如果我将周期设置为 1.5 并将大小设置为 3.5 秒,它将每 1.0 秒创建 3.5 个窗口。它预计每 1.5 秒有 3.5 秒的窗口。

是否有可能有一个几分之一秒的周期?

0 投票
4 回答
4450 浏览

python - 如何为 apache 光束数据流的输出 csv 添加标头?

我注意到在 java sdk 中,有一个函数可以让您编写 csv 文件的标题。 https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/TextIO.Write.html#withHeader-java.lang.String-

此功能是否反映在 python skd 上?

0 投票
1 回答
239 浏览

python - 为什么 groupBy 会阻碍我的管道?

我有一个用 python apache-beam 编写的管道。它将 800,000 个时间戳数据窗口化为每 1 秒重叠的 2 秒窗口。我的元素可能有不同的键。

当它执行 groupBy 时,需要 3 个小时才能完成。我使用 10 个工作人员部署在云数据流中。当我增加工人数量时,处理速度并没有显着提高。为什么这种转变会成为我的管道的瓶颈?

0 投票
1 回答
2541 浏览

google-bigquery - 从 ParDo 函数中写入 BigQuery

我想从 ParDo 函数中调用一个操作,为(我使用的是 python SDK)beam.io.Write(beam.io.BigQuerySink(..))中的每个键生成一个单独的 BigQuery 表。PCollection这是两个类似的线程,不幸的是没有帮助:

1) https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

2)从数据流管道写入 BQ 时的动态表名

当我执行以下代码时,第一个键的行被插入到 BigQuery 中,然后管道失败并出现以下错误。非常感谢任何关于我做错了什么的建议或任何关于如何解决它的建议。

管道代码:

错误信息:


编辑(在第一个答案之后):

我没有意识到我的价值需要是一个PCollection.

我现在已经把我的代码改成了这个(这可能效率很低):

现在在本地可以正常工作,但不适用于BlockingDataflowPipelineRunner:-(

管道失败并出现以下错误:

0 投票
2 回答
1603 浏览

python - 如何在 python apache Beam 的窗口中订购元素?

我注意到 java apache beam 有类 groupby.sortbytimestamp python 是否实现了该功能?如果不是,那么在窗口中对元素进行排序的方法是什么?我想我可以在 DoFn 中对整个窗口进行排序,但我想知道是否有更好的方法。

0 投票
1 回答
929 浏览

scala - 使用 Dataflow 将 PubSub 流写入 Cloud Storage 时出错

使用 SCIO fromspotify为 编写作业Dataflow,遵循 2 个示例e.g1e.g2PubSub将流写入到GCS,但以下代码出现以下错误

错误

代码

我可能将窗口概念与 Bounded PCollection 混合在一起,有没有办法实现这一点,或者我需要应用一些变换来实现这一点,任何人都可以在这方面提供帮助