问题标签 [apache-beam-internals]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-platform - 我收到此错误 Getting SEVERE Channel ManagedChannelImpl{logId=1, target=bigquerystorage.googleapis.com:443} 未正确关闭
我创建了一个 Beam 脚本来从 kafka 获取数据并使用 Apache Beam 将其推送到 BigQuery。现在我正在使用 java-direct-runner,只需要将数据推送到我的 bigquery。
这是我的代码:-
pom.xml
我得到的错误: -
2021 年 5 月 19 日晚上 9:21:45 io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue SEVERE: ~ ~ ~ Channel ManagedChannelImpl{logId=1, target=bigquerystorage.googleapis.com:443} 没有正确关闭!!!~ ~ ~ 确保调用 shutdown()/shutdownNow() 并等待 awaitTermination() 返回 true。java.lang.RuntimeException: ManagedChannel 分配站点
amazon-web-services - AWS lambda 上的 Apache Beam 的 Spark Runner
我正在使用 Apache Beam 的 SparkRunner 对某些数据执行转换(SparkRunner 允许 apache Beam 在其管道之上运行 Apache Spark 引擎)。该应用程序在我的本地机器上运行良好。我创建一个罐子并使用 -
运行我的代码,它按预期工作。但是当我在 AWS Lambda 上上传这段代码并尝试使用 args 运行时 -{ "runner": "SparkRunner", "inputUrl": "s3://s3URi", "outputUrl": "s3://s3URi", "accessKey": "******************", "secretKey": "********************" }
它不起作用。我收到以下错误 - :
“errorMessage”:“无法分配请求的地址:服务 'sparkDriver' 在 16 次重试后失败(在随机空闲端口上)!考虑为服务 'sparkDriver' 显式设置适当的绑定地址(例如 SparkDriver 的 spark.driver.bindAddress)到正确的绑定地址。",
我需要一些帮助来配置 sparkDriver 的绑定地址。我尝试使用 jar 文件的 s3 URI 发送一个标记为 sparkMaster 的额外参数,但它不起作用。我非常感谢您的帮助。
google-bigquery - Apache梁Join.leftOuterJoin,如何传递空白TableRow?
我想在 Apache Beam(JAVA Sdk)中的 2 个 BigQuery 表上执行 leftOuterJoin。
我阅读了表格(
leftTableCollection
&rightTableCollection
)并且属于PCollection<TableRow>
.将它们转换为形式,
PCollection<KV<String, TableRow>>
即 <join-key, table-row>我正在
Join.leftOuterJoin
使用org.apache.beam.sdk.extensions.joinlibrary.Join
`
`
我无法弄清楚在rightTableNullValues
这里传递什么?
尝试使用new TableRow()
which 引发unable to serialize
错误。
任何建议都会非常有帮助。TIA
apache-beam - Beam DirectRunner Calcite 无法指定名称
我正在运行此梁教程的简化版本,但在我的本地计算机上使用 DirectRunner 运行它。
如果我改成my_table
它PCOLLECTION
可以工作(尽管要让它真正工作,我只需要传入rows
而不是 dict.
我得到的错误信息:
java - Apache Beam 根据前一行的值更新当前行的值
Apache Beam 根据上一行的值更新值
我已经对 CSV 文件中的值进行了分组。在分组的行中,我们发现一些缺失值需要根据前一行的值进行更新。如果该行的第一列是空的,那么我们需要将其更新为 0。
我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?
记录
客户ID | 日期 | 数量 |
---|---|---|
BS:89481 | 2012 年 1 月 1 日 | 100 |
BS:89482 | 2012 年 1 月 1 日 | |
BS:89483 | 2012 年 1 月 1 日 | 300 |
BS:89481 | 2012 年 1 月 2 日 | 900 |
BS:89482 | 2012 年 1 月 2 日 | 200 |
BS:89483 | 2012 年 1 月 2 日 |
分组记录
客户ID | 日期 | 数量 |
---|---|---|
BS:89481 | 2012 年 1 月 1 日 | 100 |
BS:89481 | 2012 年 1 月 2 日 | 900 |
BS:89482 | 2012 年 1 月 1 日 | |
BS:89482 | 2012 年 1 月 2 日 | 200 |
BS:89483 | 2012 年 1 月 1 日 | 300 |
BS:89483 | 2012 年 1 月 2 日 |
更新缺失值
客户ID | 日期 | 数量 |
---|---|---|
BS:89481 | 2012 年 1 月 1 日 | 100 |
BS:89481 | 2012 年 1 月 2 日 | 900 |
BS:89482 | 2012 年 1 月 1 日 | 000 |
BS:89482 | 2012 年 1 月 2 日 | 200 |
BS:89483 | 2012 年 1 月 1 日 | 300 |
BS:89483 | 2012 年 1 月 2 日 | 300 |
到目前为止的代码:
修改代码: 原始代码
按客户 ID 分组
按客户 ID 分组:
// 尝试按日期分组
如何编写将 Iterable 转换为 pCollection 的逻辑,以便对日期进行排序。
Avro 架构:
更新将 PCollection 转换为 Row[]
建议代码
错误:
Iterables 类型中的 toArray(Iterable<? extends T>, Class) 方法不适用于参数 (PCollection, Class)
将可迭代对象转换为数组
错误:
此行有多个标记 - 类型不匹配:无法从 PCollection<Row[]> 转换为 Row[] - 1 行已更改,2 行已删除
java - 如何在两个 PCollection 上追加新行或执行联合
在下面的 CSV 中,我需要为其附加新的行值。
ID | 日期 | 平衡 |
---|---|---|
01 | 2021 年 1 月 31 日 | 100 |
01 | 28/02/2021 | 200 |
01 | 2021 年 3 月 31 日 | 200 |
01 | 2021 年 4 月 30 日 | 200 |
01 | 2021 年 5 月 31 日 | 500 |
01 | 2021 年 6 月 30 日 | 600 |
预期输出:
ID | 日期 | 平衡 |
---|---|---|
01 | 2021 年 1 月 31 日 | 100 |
01 | 28/02/2021 | 200 |
01 | 2021 年 3 月 31 日 | 200 |
01 | 2021 年 4 月 30 日 | 200 |
01 | 2021 年 5 月 31 日 | 500 |
01 | 2021 年 6 月 30 日 | 600 |
01 | 2021 年 7 月 30 日 | 999 |
Java代码:
如何组合这两个 PCollection 对象?
添加行的逻辑
我参考了这个链接
apache-beam - java - 如何在java中使用JDBCIO apache Beam插入数据库表时提取错误记录
我在内存中创建 PCollection 并将其写入 postgres sql。现在,当我向表中插入数据时,很少有记录可能会抛出异常并且不会被插入。启动管道时如何提取此类失败的插入记录?
下面是我为管道编写的代码:
google-cloud-dataflow - Apache Beam - 多个 Pcollection - Dataframetransform 问题
我在 apache 梁中运行以下示例
代码错误并出现错误`
RuntimeError:管道中已存在带有标签“TransformedDF/BatchElements(pc)”的转换`
输出 = {"a":pcol1, "b":pcol2"} | DataframeTransform(lambda/function)
我目前正在使用 apache beam 2.35.0 Python SDK 有这个问题吗?
google-cloud-dataflow - Apache Beam Python - 带有命名 PCollection 问题的 SQL 转换
我正在尝试执行下面的代码,其中我使用命名元组进行 PCollection 和 SQL 转换来进行简单的选择。
根据视频链接 (4:06):https ://www.youtube.com/watch?v=zx4p-UNSmrA 。
除了在 SQLTransform 查询中使用 PCOLLECTION 之外,还可以提供命名的 PCollections,如下所示。
代码块
但是,下面的代码块出错并出现错误
Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.validate.SqlValidatorException: Object 'a' not found
使用的 Apache Beam SDK 是 2.35.0,使用命名 PCollection 是否有任何已知限制
apache-beam - 将 Apache Beam 中的多部分空 csv 文件写入 netApp 存储网格时出现异常
问题陈述
我们正在使用多个 csv 文件到 pcollections -> 应用 beam SQL 来转换数据 -> 写入结果 pcollection。如果我们在所有源 pCollections 中都有一些数据并且梁 SQL 使用一些数据生成新集合,那么这绝对可以正常工作。当 Transform pCollection 生成空 pCollection 并且在 netApp Storage Grid 中写入它时,它会在下面抛出,
以下是示例代码
观察
- 如果我们编写简单文件而不是多部分文件(简单将对象放入存储网格),则工作正常
- 似乎是 Storage Grid 的已知问题,但我们想检查我们是否可以从梁管道处理这个问题。
我试过的
- 试图查看我是否可以在写入之前检查 PCollection 的大小并将一些字符串放入输出文件,但由于 PCollection 是空的,它根本不会进入 PTransform。
- 也尝试使用 Count.globally ,但该事件没有帮助
问
- 无论如何我们可以在 Beam 中处理这个问题,就像我们可以在写入之前检查 PCollection 的大小一样?如果大小为零,即空 pcollection,那么我们可以避免写入文件以避免此问题。
- 有没有人遇到过类似的问题并能够解决?