问题标签 [apache-beam]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - WriteToText 在 DirectRunner 中工作,但在 DataflowRunner 中因 TypeError 而失败
我可以运行这段代码,DirectRunner
它工作正常。随着DataflowRunner
它崩溃:
TypeError: process() 正好需要 4 个参数(给定 3 个)[在运行 'write_text/Write/WriteImpl/WriteBundles' 时]`
我的 apache-beam-sdk 是按照说明从 master 克隆和构建的。它构建为apache-beam-sdk==0.6.0.dev0
. 我对该版本持怀疑态度,但是因为(我认为)我看到代码更改但版本最近没有更改(NewDoFn
消失但版本没有更改)。
我不确定这是否是问题的根源,但似乎已安装的 sdk 和数据流容器不匹配。我得到另一个不匹配类型的错误,直接DirectRunner
传递element
给我的DoFn.process()
while DataflowRunner
pass context
。
我试图将其隔离为最简单的代码:
完整输出:
google-cloud-dataflow - '_UnwindowedValues' 类型的对象没有 len() 是什么意思?
我正在使用数据流 0.5.5 Python。在非常简单的代码中遇到以下错误:
row_list
是一个列表。完全相同的代码、相同的数据和相同的管道在 DirectRunner 上运行得非常好,但在 DataflowRunner 上抛出了以下异常。这是什么意思,我该如何解决?
代码:trip_augmentation_test.py
在此处调用管道(我使用的是 Google Cloud Datalab)
跟进
我row_list
在 DataflowRunner 中记录了 的类型,结果是 <class 'apache_beam.transforms.trigger._UnwindowedValues'>
,而在 DirectRunner 中,它是list
. 这是预期的不一致吗?
apache-beam - 如何在 Apache Beam 或 Dataflow 中将记录编号添加到 TextIO 文件源
我正在使用 Dataflow(现在是 Beam)来处理遗留文本文件以复制现有 ETL 工具的转换。当前进程添加一个记录号(每个文件中每一行的记录号)和文件名。他们想要保留这些附加信息的原因是,他们可以知道源数据来自哪个文件和记录偏移量。
我想达到一个点,即我有一个 PCollection,其中包含文件记录号和文件名作为键的值或部分的附加字段。
我看过另一篇文章,其中文件名可以填充到生成的 PCollection 中,但是我没有添加每行记录数的解决方案。目前我能做到的唯一方法是在开始 Dataflow 过程之前预处理文件(这很遗憾,因为我希望 Dataflow/Beam 来完成这一切)
apache-spark - AssertionError:断言失败:copyAndReset 必须返回零值副本
当我申请ParDo.of(new ParDoFn())
namedPCollection
时textInput
,程序会抛出这个异常。但是我删除时程序是正常的.apply(ParDo.of(new ParDoFn()))
。
//SparkRunner
python - 使用 write_truncate 通过 Google Dataflow/Beam 将数据加载到 Biqquery 分区表中
因此,我们用来每天创建一个新表的现有设置,在“WRITE_TRUNCATE”选项下工作得很好,但是当我们更新代码以使用分区表时,虽然我们的数据流作业,但它不适用于 write_truncate。
它工作得非常好,写入配置设置为“WRITE_APPEND”(据我了解,从梁,它可能会尝试删除表,然后重新创建它),因为我提供了表装饰器它无法创建一个新的桌子。
使用 python 代码的示例片段:
这给出了错误:
表 ID 必须是字母数字
因为它试图重新创建表,我们在参数中提供了分区装饰器。
以下是我尝试过的一些事情:
- 将 write_disposition 更新为 WRITE_APPEND,虽然它有效,但它没有达到目的,因为再次在同一日期运行会重复数据。
- 使用
bq --apilog /tmp/log.txt load --replace --source_format=NEWLINE_DELIMITED_JSON 'table.$20160101' sample_json.json
命令,看看我是否可以观察任何日志,根据我找到的链接,截断实际上是如何工作的。
- 尝试了其他一些链接,但这也使用了 WRITE_APPEND。
有没有办法使用 write_truncate 方法从数据流作业写入分区表?
让我知道是否需要任何其他详细信息。谢谢
google-cloud-dataflow - 使用 MySQL 作为输入源并写入 Google BigQuery
我有一个 Apache Beam 任务,它使用 JDBC 从 MySQL 源读取数据,它应该将数据按原样写入 BigQuery 表。此时不执行任何转换,稍后会进行转换,目前我只想将数据库输出直接写入 BigQuery。
这是尝试执行此操作的主要方法:
但是当我使用 maven 执行模板时,出现以下错误:
Test.java:[184,6] 找不到符号符号:方法 apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
位置:类 org.apache.beam.sdk.io.jdbc。 JdbcIO.Read<com.google.cloud.dataflow.sdk.values.KV<java.lang.String,java.lang.String>>
看来我没有通过 BigQueryIO.Write 预期的数据收集,这就是我目前正在努力解决的问题。
在这种情况下,如何使来自 MySQL 的数据符合 BigQuery 的期望?
java - Beam java SDK,一个java语法混淆
在 Apache Beam 编程指南https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition中,我看到了这样的代码,我对下面的 Java 语法感到很困惑,请解释一下,谢谢。
为什么有一个点.
和一个泛型参数<String>
后跟一个类Flatten
?谁能告诉我这个的java语法?
python - Python Apache Beam 侧输入断言错误
我对 Apache Beam/Cloud Dataflow 还是新手,所以如果我的理解不正确,我深表歉意。
我正在尝试通过管道读取约 30,000 行长的数据文件。我的简单管道首先从 GCS 打开 csv,从数据中提取标头,通过 ParDo/DoFn 函数运行数据,然后将所有输出写入 csv 回 GCS。该管道有效,是我的第一个测试。
然后我编辑了管道以读取 csv,拉出标题,从数据中删除标题,通过 ParDo/DoFn 函数运行数据,并将标题作为侧输入,然后将所有输出写入 csv。唯一的新代码是将标头作为侧输入传递并从数据中过滤它。
ParDo/DoFn 函数 build_rows 只产生 context.element,这样我就可以确保我的边输入正常工作。
我得到的错误如下:
我不确定问题是什么,但我认为这可能是由于内存限制。我将示例数据从 30,000 行减少到 100 行,我的代码终于可以正常工作了。
没有侧输入的管道确实读/写了所有 30,000 行,但最后我需要侧输入来对我的数据进行转换。
如何修复我的管道,以便我可以处理来自 GCS 的大型 csv 文件,并且仍然使用边输入作为文件的伪全局变量?
python - Python 中的 Apache Beam,beam.io.TextFileSource 出错
我正在尝试在 GCP 存储库上的数据科学中运行代码,并在 Beam 代码中不断出现错误。
这是给出错误的行: beam.Read(beam.io.TextFileSource('airports.csv.gz')
这是我得到的错误: AttributeError: 'module' object has no attribute 'TextFileSource'
有谁知道如何让这个工作,或者我错过了什么?
google-cloud-dataflow - 为什么我的 Fusion Breaker 会丢失或保留数据?
我正在开发一个流式数据流管道,该管道使用来自 PubSub 的批处理项目的消息并最终将它们写入数据存储区。为了更好的并行性,也为了及时确认从 PubSub 中提取的消息,我将批次解压缩为单独的项目,并在其后添加一个融合断路器。
所以管道看起来像这样......
PubSubIO -> 反序列化 -> 解包 -> 融合中断 -> 验证/转换 -> DatastoreIO。
这是我的融合断路器,主要是从JdbcIO 类中复制的。它使用触发器来分解全局窗口中的数据。
它大部分时间都在工作,除了在某些情况下它生成的输出数量少于输入数量,即使在流输入完成并且管道空闲十分钟之后也是如此。
如下面的 Dataflow Job 监控控制台所示。屏幕截图是在作业耗尽后拍摄的,在我等待大约 10 分钟以使数据从转换中出来之后。
*有人能想出一个解释吗?感觉好像融合破坏者正在阻止或丢失了一些物品。*
我注意到它只发生在数据量/数据速率很高的情况下,迫使管道在测试运行过程中扩大规模,从 25 个 n1-highmem-2 工作人员翻倍到 50 个。但是,我还没有做足够的测试来验证放大是否是重现此问题的关键。
或者触发器可能每两秒一次过于频繁地触发?
我正在使用数据流 2.0.0-beta1。作业 ID 为“2017-02-23_23_15_34-14025424484787508627”。