问题标签 [apache-beam]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-dataflow - 在批处理管道上接收到大于最大值的消息
我一直在每天在谷歌的云数据流服务上运行的批处理管道上收到此消息。它已开始失败,并显示以下消息:
我仍在使用旧的解决方法来输出带有标题的 CSV 文件,例如
这是什么原因造成的?现在这个DoFn的输出是不是太大了?正在处理的数据集的大小没有增加。
google-cloud-dataflow - 如何使用 Python SDK 调试自定义接收器
我在使用 Python SDK 编写自定义接收器时遇到问题。为了调试它,我使用了 logging.info 或 print 方法,但它不能很好地工作。有没有调试自定义接收器的好方法?
python - Python Apache Beam 管道状态 API 调用
我们目前有一个 Python Apache Beam 管道正在工作并且能够在本地运行。我们现在正在让管道在 Google Cloud Dataflow 上运行并完全自动化,但在 Dataflow/Apache Beam 的管道监控中发现了一个限制。
目前,Cloud Dataflow有两种监控管道状态的方法,一种是通过其 UI 界面,另一种是通过命令行中的 gcloud。这两种解决方案都不适用于完全自动化的解决方案,我们可以考虑无损文件处理。
查看 Apache Beam 的 github,他们有一个文件internal/apiclient.py,其中显示有一个用于获取作业状态的函数get_job。
我们发现 get_job 使用的一个实例在runners/dataflow_runner.py中。
最终目标是使用此 API 获取我们自动触发运行的一个或多个作业的状态,以确保它们最终都通过管道成功处理。
任何人都可以向我们解释在我们运行管道(p.run()
)之后如何使用这个 API 吗?我们不明白runner
inresponse = runner.dataflow_client.get_job(job_id)
从哪里来。
如果有人可以更深入地了解我们如何在设置/运行我们的管道时访问此 API 调用,那就太好了!
google-cloud-dataflow - Apache Beam:步骤 B 是否可以在不将 A 的输出传递到 B 的情况下依赖于步骤 A?
当 A 不产生任何输出时,有没有办法让 PTransform B 依赖于 PTransform A?或者我是否必须让 A 产生一个虚拟输出作为侧输入馈入 B?一个示例用例是我希望拥有以下管道的地方:
我希望 B 仅在 A 完成后启动,但 A 不会产生任何对 B 有用的输出 PCollection。
google-cloud-dataflow - Apache Beam:跳过已经构建的管道中的步骤
有没有办法有条件地跳过已经构建的管道中的步骤?或者管道构造是否设计为控制运行哪些步骤的唯一方法?
google-cloud-dataflow - 如何使用 Dataflow Python SDK 读取 BigQuery 嵌套表
如何使用 Apache Beam Python SDK 读取嵌套结构?
导致
是否可以读取嵌套结构?
java - Apache Beam 获取源文件名
编辑:已解决!
我有来自多种语言的多个文本文件。我想使用 Apache Beam 为每一行添加一个语言标签。
例子:
文件text_en:
This is a sentence.
文件text_de:Dies ist ein Satz.
我想要的是这样的:
我试过的:
我最初尝试只使用一个TextIO.Read.From(dataSetDirectory+"/*")
并寻找一个看起来像.getSource()
. 然而,这似乎并不存在。
接下来,我尝试像这样一个一个地读取每个文件:
以这种方式读取文件效果很好,但是我的 DoFnLanguageTagAdder
仅使用第一种语言进行初始化,因此所有文件都具有相同的添加语言。
LanguageTagAdder
看起来像这样:
我意识到这种行为是有意和需要的,以便数据可以并行化,但我将如何解决我的问题?有没有Beam方法来解决它?
new LanguageTagAdder
PS:我在第二次(使用第二种语言)创建时收到以下警告:
编辑: 问题是线
dataSet.and(newPCollection);
它需要重写为:
dataSet=dataSet.and(newPCollection);
原来,dataSet 只包含第一个 File.... 难怪它们都有相同的语言标签!
python - 如何从python中的数据流/束查询数据存储
看起来 google 已经发布了对从 python 中的 dataflow/beam 查询数据存储的支持。我试图让它在本地运行,但我遇到了一些问题:
这给了我一个
我猜我传入了错误的查询对象(有 3-4 个 pip 包可以从中导入数据存储),但我不知道应该传入哪个。在测试中他们正在传递protobuf。那是我必须使用的吗?如果这是我必须做的,任何人都可以使用 protobuf 显示一个简单的示例查询吗?
google-cloud-dataflow - 如何共存数据流 sdk 和 aws sdk
我正在尝试使用 Dataflow Java SDK 编写 S3 自定义接收器。虽然 AWS SDK 使用 httpclient4.5.2,但 Dataflow SDK 仍然使用 4.0.1。因此,当我尝试运行这项工作时,这是冲突的。我想升级Dataflow的httpclient的版本是解决它的方法。我该如何更新它?
谢谢你。
python - 如何将 csv 转换为 apache Beam 数据流中的字典
我想读取一个 csv 文件并使用 apache Beam 数据流将其写入 BigQuery。为此,我需要以字典的形式将数据呈现给 BigQuery。为了做到这一点,我如何使用 apache Beam 转换数据?
我的输入 csv 文件有两列,我想在 BigQuery 中创建一个后续的两列表。我知道如何在 BigQuery 中创建数据,这很简单,我不知道如何将 csv 转换为字典。下面的代码不正确,但应该让我知道我正在尝试做什么。