问题标签 [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1179 浏览

python - WriteToText 在 DirectRunner 中工作,但在 DataflowRunner 中因 TypeError 而失败

我可以运行这段代码,DirectRunner它工作正常。随着DataflowRunner它崩溃:

TypeError: process() 正好需要 4 个参数(给定 3 个)[在运行 'write_text/Write/WriteImpl/WriteBundles' 时]`

我的 apache-beam-sdk 是按照说明从 master 克隆和构建的。它构建为apache-beam-sdk==0.6.0.dev0. 我对该版本持怀疑态度,但是因为(我认为)我看到代码更改但版本最近没有更改(NewDoFn消失但版本没有更改)。

我不确定这是否是问题的根源,但似乎已安装的 sdk 和数据流容器不匹配。我得到另一个不匹配类型的错误,直接DirectRunner传递element给我的DoFn.process()while DataflowRunnerpass context

我试图将其隔离为最简单的代码:

完整输出:

0 投票
1 回答
3180 浏览

google-cloud-dataflow - '_UnwindowedValues' 类型的对象没有 len() 是什么意思?

我正在使用数据流 0.5.5 Python。在非常简单的代码中遇到以下错误:

row_list是一个列表。完全相同的代码、相同的数据和相同的管道在 DirectRunner 上运行得非常好,但在 DataflowRunner 上抛出了以下异常。这是什么意思,我该如何解决?

代码:trip_augmentation_test.py

在此处调用管道(我使用的是 Google Cloud Datalab)

跟进

row_list在 DataflowRunner 中记录了 的类型,结果是 <class 'apache_beam.transforms.trigger._UnwindowedValues'>,而在 DirectRunner 中,它是list. 这是预期的不一致吗?

0 投票
0 回答
204 浏览

apache-beam - 如何在 Apache Beam 或 Dataflow 中将记录编号添加到 TextIO 文件源

我正在使用 Dataflow(现在是 Beam)来处理遗留文本文件以复制现有 ETL 工具的转换。当前进程添加一个记录号(每个文件中每一行的记录号)和文件名。他们想要保留这些附加信息的原因是,他们可以知道源数据来自哪个文件和记录偏移量。

我想达到一个点,即我有一个 PCollection,其中包含文件记录号和文件名作为键的值或部分的附加字段。

我看过另一篇文章,其中文件名可以填充到生成的 PCollection 中,但是我没有添加每行记录数的解决方案。目前我能做到的唯一方法是在开始 Dataflow 过程之前预处理文件(这很遗憾,因为我希望 Dataflow/Beam 来完成这一切)

0 投票
2 回答
1100 浏览

apache-spark - AssertionError:断言失败:copyAndReset 必须返回零值副本

当我申请ParDo.of(new ParDoFn())namedPCollectiontextInput,程序会抛出这个异常。但是我删除时程序是正常的.apply(ParDo.of(new ParDoFn()))

//SparkRunner

0 投票
1 回答
1155 浏览

python - 使用 write_truncate 通过 Google Dataflow/Beam 将数据加载到 Biqquery 分区表中

因此,我们用来每天创建一个新表的现有设置,在“WRITE_TRUNCATE”选项下工作得很好,但是当我们更新代码以使用分区表时,虽然我们的数据流作业,但它不适用于 write_truncate。

它工作得非常好,写入配置设置为“WRITE_APPEND”(据我了解,从,它可能会尝试删除表,然后重新创建它),因为我提供了表装饰器它无法创建一个新的桌子。

使用 python 代码的示例片段:

这给出了错误:

表 ID 必须是字母数字

因为它试图重新创建表,我们在参数中提供了分区装饰器。

以下是我尝试过的一些事情:

  1. 将 write_disposition 更新为 WRITE_APPEND,虽然它有效,但它没有达到目的,因为再次在同一日期运行会重复数据。
  2. 使用

bq --apilog /tmp/log.txt load --replace --source_format=NEWLINE_DELIMITED_JSON 'table.$20160101' sample_json.json

命令,看看我是否可以观察任何日志,根据我找到的链接,截断实际上是如何工作的。

  1. 尝试了其他一些链接,但这也使用了 WRITE_APPEND。

有没有办法使用 write_truncate 方法从数据流作业写入分区表?

让我知道是否需要任何其他详细信息。谢谢

0 投票
1 回答
2241 浏览

google-cloud-dataflow - 使用 MySQL 作为输入源并写入 Google BigQuery

我有一个 Apache Beam 任务,它使用 JDBC 从 MySQL 源读取数据,它应该将数据按原样写入 BigQuery 表。此时不执行任何转换,稍后会进行转换,目前我只想将数据库输出直接写入 BigQuery。

这是尝试执行此操作的主要方法:

但是当我使用 maven 执行模板时,出现以下错误:

Test.java:[184,6] 找不到符号符号:方法 apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
位置:类 org.apache.beam.sdk.io.jdbc。 JdbcIO.Read<com.google.cloud.dataflow.sdk.values.KV<java.lang.String,java.lang.String>>

看来我没有通过 BigQueryIO.Write 预期的数据收集,这就是我目前正在努力解决的问题。

在这种情况下,如何使来自 MySQL 的数据符合 BigQuery 的期望?

0 投票
1 回答
61 浏览

java - Beam java SDK,一个java语法混淆

在 Apache Beam 编程指南https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition中,我看到了这样的代码,我对下面的 Java 语法感到很困惑,请解释一下,谢谢。

为什么有一个点.和一个泛型参数<String>后跟一个类Flatten?谁能告诉我这个的java语法?

0 投票
1 回答
2115 浏览

python - Python Apache Beam 侧输入断言错误

我对 Apache Beam/Cloud Dataflow 还是新手,所以如果我的理解不正确,我深表歉意。

我正在尝试通过管道读取约 30,000 行长的数据文件。我的简单管道首先从 GCS 打开 csv,从数据中提取标头,通过 ParDo/DoFn 函数运行数据,然后将所有输出写入 csv 回 GCS。该管道有效,是我的第一个测试。

然后我编辑了管道以读取 csv,拉出标题,从数据中删除标题,通过 ParDo/DoFn 函数运行数据,并将标题作为侧输入,然后将所有输出写入 csv。唯一的新代码是将标头作为侧输入传递并从数据中过滤它。

在此处输入图像描述 在此处输入图像描述

ParDo/DoFn 函数 build_rows 只产生 context.element,这样我就可以确保我的边输入正常工作。

我得到的错误如下: 我不确定问题是什么,但我认为这可能是由于内存限制。我将示例数据从 30,000 行减少到 100 行,我的代码终于可以正常工作了。在此处输入图像描述

没有侧输入的管道确实读/写了所有 30,000 行,但最后我需要侧输入来对我的数据进行转换。

如何修复我的管道,以便我可以处理来自 GCS 的大型 csv 文件,并且仍然使用边输入作为文件的伪全局变量?

0 投票
2 回答
918 浏览

python - Python 中的 Apache Beam,beam.io.TextFileSource 出错

我正在尝试在 GCP 存储库上的数据科学中运行代码,并在 Beam 代码中不断出现错误。

这是给出错误的行: beam.Read(beam.io.TextFileSource('airports.csv.gz')

这是我得到的错误: AttributeError: 'module' object has no attribute 'TextFileSource'

这是完整的文件: https ://github.com/GoogleCloudPlatform/data-science-on-gcp/blob/master/04_streaming/simulate/df01.py

有谁知道如何让这个工作,或者我错过了什么?

0 投票
1 回答
326 浏览

google-cloud-dataflow - 为什么我的 Fusion Breaker 会丢失或保留数据?

我正在开发一个流式数据流管道,该管道使用来自 PubSub 的批处理项目的消息并最终将它们写入数据存储区。为了更好的并行性,也为了及时确认从 PubSub 中提取的消息,我将批次解压缩为单独的项目,并在其后添加一个融合断路器。

所以管道看起来像这样......

PubSubIO -> 反序列化 -> 解包 -> 融合中断 -> 验证/转换 -> DatastoreIO。

这是我的融合断路器,主要是从JdbcIO 类中复制的。它使用触发器来分解全局窗口中的数据。

它大部分时间都在工作,除了在某些情况下它生成的输出数量少于输入数量,即使在流输入完成并且管道空闲十分钟之后也是如此。

如下面的 Dataflow Job 监控控制台所示。屏幕截图是在作业耗尽后拍摄的,在我等待大约 10 分钟以使数据从转换中出来之后。

在此处输入图像描述

*有人能想出一个解释吗?感觉好像融合破坏者正在阻止或丢失了一些物品。*

我注意到它只发生在数据量/数据速率很高的情况下,迫使管道在测试运行过程中扩大规模,从 25 个 n1-highmem-2 工作人员翻倍到 50 个。但是,我还没有做足够的测试来验证放大是否是重现此问题的关键。

或者触发器可能每两秒一次过于频繁地触发?

我正在使用数据流 2.0.0-beta1。作业 ID 为“2017-02-23_23_15_34-14025424484787508627”。