问题标签 [apache-beam-internals]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1615 浏览

google-cloud-platform - 我收到此错误 Getting SEVERE Channel ManagedChannelImpl{logId=1, target=bigquerystorage.googleapis.com:443} 未正确关闭

我创建了一个 Beam 脚本来从 kafka 获取数据并使用 Apache Beam 将其推送到 BigQuery。现在我正在使用 java-direct-runner,只需要将数据推送到我的 bigquery。

这是我的代码:-

pom.xml

我得到的错误: -

2021 年 5 月 19 日晚上 9:21:45 io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue SEVERE: ~ ~ ~ Channel ManagedChannelImpl{logId=1, target=bigquerystorage.googleapis.com:443} 没有正确关闭!!!~ ~ ~ 确保调用 shutdown()/shutdownNow() 并等待 awaitTermination() 返回 true。java.lang.RuntimeException: ManagedChannel 分配站点

0 投票
0 回答
120 浏览

amazon-web-services - AWS lambda 上的 Apache Beam 的 Spark Runner

我正在使用 Apache Beam 的 SparkRunner 对某些数据执行转换(SparkRunner 允许 apache Beam 在其管道之上运行 Apache Spark 引擎)。该应用程序在我的本地机器上运行良好。我创建一个罐子并使用 -

运行我的代码,它按预期工作。但是当我在 AWS Lambda 上上传这段代码并尝试使用 args 运行时 -{ "runner": "SparkRunner", "inputUrl": "s3://s3URi", "outputUrl": "s3://s3URi", "accessKey": "******************", "secretKey": "********************" }

它不起作用。我收到以下错误 - :

“errorMessage”:“无法分配请求的地址:服务 'sparkDriver' 在 16 次重试后失败(在随机空闲端口上)!考虑为服务 'sparkDriver' 显式设置适当的绑定地址(例如 SparkDriver 的 spark.driver.bindAddress)到正确的绑定地址。",

我需要一些帮助来配置 sparkDriver 的绑定地址。我尝试使用 jar 文件的 s3 URI 发送一个标记为 sparkMaster 的额外参数,但它不起作用。我非常感谢您的帮助。

0 投票
0 回答
55 浏览

google-bigquery - Apache梁Join.leftOuterJoin,如何传递空白TableRow?

我想在 Apache Beam(JAVA Sdk)中的 2 个 BigQuery 表上执行 leftOuterJoin。

  1. 我阅读了表格(leftTableCollection& rightTableCollection)并且属于PCollection<TableRow>.

  2. 将它们转换为形式,PCollection<KV<String, TableRow>>即 <join-key, table-row>

  3. 我正在Join.leftOuterJoin使用org.apache.beam.sdk.extensions.joinlibrary.Join

`

`

我无法弄清楚在rightTableNullValues这里传递什么?

尝试使用new TableRow()which 引发unable to serialize错误。

任何建议都会非常有帮助。TIA

0 投票
1 回答
93 浏览

apache-beam - Beam DirectRunner Calcite 无法指定名称

我正在运行此梁教程的简化版本,但在我的本地计算机上使用 DirectRunner 运行它。

如果我改成my_tablePCOLLECTION可以工作(尽管要让它真正工作,我只需要传入rows而不是 dict.

我得到的错误信息:

0 投票
1 回答
246 浏览

java - Apache Beam 根据前一行的值更新当前行的值

Apache Beam 根据上一行的值更新值

我已经对 CSV 文件中的值进行了分组。在分组的行中,我们发现一些缺失值需要根据前一行的值进行更新。如果该行的第一列是空的,那么我们需要将其更新为 0。

我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?

记录

客户ID 日期 数量
BS:89481 2012 年 1 月 1 日 100
BS:89482 2012 年 1 月 1 日
BS:89483 2012 年 1 月 1 日 300
BS:89481 2012 年 1 月 2 日 900
BS:89482 2012 年 1 月 2 日 200
BS:89483 2012 年 1 月 2 日

分组记录

客户ID 日期 数量
BS:89481 2012 年 1 月 1 日 100
BS:89481 2012 年 1 月 2 日 900
BS:89482 2012 年 1 月 1 日
BS:89482 2012 年 1 月 2 日 200
BS:89483 2012 年 1 月 1 日 300
BS:89483 2012 年 1 月 2 日

更新缺失值

客户ID 日期 数量
BS:89481 2012 年 1 月 1 日 100
BS:89481 2012 年 1 月 2 日 900
BS:89482 2012 年 1 月 1 日 000
BS:89482 2012 年 1 月 2 日 200
BS:89483 2012 年 1 月 1 日 300
BS:89483 2012 年 1 月 2 日 300

到目前为止的代码:

修改代码: 原始代码

按客户 ID 分组

按客户 ID 分组:

// 尝试按日期分组

如何编写将 Iterable 转换为 pCollection 的逻辑,以便对日期进行排序。

Avro 架构:

更新将 PCollection 转换为 Row[]

建议代码

错误:

Iterables 类型中的 toArray(Iterable<? extends T>, Class) 方法不适用于参数 (PCollection, Class)

将可迭代对象转换为数组

错误:

此行有多个标记 - 类型不匹配:无法从 PCollection<Row[]> 转换为 Row[] - 1 行已更改,2 行已删除

0 投票
1 回答
86 浏览

java - 如何在两个 PCollection 上追加新行或执行联合

在下面的 CSV 中,我需要为其附加新的行值。

ID 日期 平衡
01 2021 年 1 月 31 日 100
01 28/02/2021 200
01 2021 年 3 月 31 日 200
01 2021 年 4 月 30 日 200
01 2021 年 5 月 31 日 500
01 2021 年 6 月 30 日 600

预期输出:

ID 日期 平衡
01 2021 年 1 月 31 日 100
01 28/02/2021 200
01 2021 年 3 月 31 日 200
01 2021 年 4 月 30 日 200
01 2021 年 5 月 31 日 500
01 2021 年 6 月 30 日 600
01 2021 年 7 月 30 日 999

Java代码:

如何组合这两个 PCollection 对象?

添加行的逻辑

我参考了这个链接

https://beam.apache.org/documentation/pipelines/design-your-pipeline/#:~:text=Merging%20PCollections,-Often%2C%20after%20you&text=You%20can%20do%20so%20by,join %20between%20two%20PCollection%20s

0 投票
1 回答
143 浏览

apache-beam - java - 如何在java中使用JDBCIO apache Beam插入数据库表时提取错误记录

我在内存中创建 PCollection 并将其写入 postgres sql。现在,当我向表中插入数据时,很少有记录可能会抛出异常并且不会被插入。启动管道时如何提取此类失败的插入记录?

下面是我为管道编写的代码:

0 投票
0 回答
30 浏览

google-cloud-dataflow - Apache Beam - 多个 Pcollection - Dataframetransform 问题

我在 apache 梁中运行以下示例

代码错误并出现错误`

RuntimeError:管道中已存在带有标签“TransformedDF/BatchElements(pc)”的转换`

根据链接 https://beam.apache.org/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline ,语法和用法似乎是正确的

输出 = {"a":pcol1, "b":pcol2"} | DataframeTransform(lambda/function)

我目前正在使用 apache beam 2.35.0 Python SDK 有这个问题吗?

0 投票
0 回答
23 浏览

google-cloud-dataflow - Apache Beam Python - 带有命名 PCollection 问题的 SQL 转换

我正在尝试执行下面的代码,其中我使用命名元组进行 PCollection 和 SQL 转换来进行简单的选择。

根据视频链接 (4:06):https ://www.youtube.com/watch?v=zx4p-UNSmrA 。

除了在 SQLTransform 查询中使用 PCOLLECTION 之外,还可以提供命名的 PCollections,如下所示。

代码块

但是,下面的代码块出错并出现错误

Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.validate.SqlValidatorException: Object 'a' not found

使用的 Apache Beam SDK 是 2.35.0,使用命名 PCollection 是否有任何已知限制

0 投票
2 回答
53 浏览

apache-beam - 将 Apache Beam 中的多部分空 csv 文件写入 netApp 存储网格时出现异常

问题陈述

我们正在使用多个 csv 文件到 pcollections -> 应用 beam SQL 来转换数据 -> 写入结果 pcollection。如果我们在所有源 pCollections 中都有一些数据并且梁 SQL 使用一些数据生成新集合,那么这绝对可以正常工作。当 Transform pCollection 生成空 pCollection 并且在 netApp Storage Grid 中写入它时,它会在下面抛出,

以下是示例代码

观察

  • 如果我们编写简单文件而不是多部分文件(简单将对象放入存储网格),则工作正常
  • 似乎是 Storage Grid 的已知问题,但我们想检查我们是否可以从梁管道处理这个问题。

我试过的

  • 试图查看我是否可以在写入之前检查 PCollection 的大小并将一些字符串放入输出文件,但由于 PCollection 是空的,它根本不会进入 PTransform。
  • 也尝试使用 Count.globally ,但该事件没有帮助

  • 无论如何我们可以在 Beam 中处理这个问题,就像我们可以在写入之前检查 PCollection 的大小一样?如果大小为零,即空 pcollection,那么我们可以避免写入文件以避免此问题。
  • 有没有人遇到过类似的问题并能够解决?