问题标签 [beam-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
396 浏览

google-cloud-dataflow - 使用 BeamSQL 构建嵌套结构

在 BigQuery 中,我们有“ARRAY_AGG”函数,它有助于将普通集合转换为嵌套集合。是否有类似的方法可以使用 BeamSQL 构建相同类型的嵌套结构集合?类似于 BeamSQL 中的以下查询,

“从 PCOLLECTION Group by Column1 中选择 column1, ARRAY_AGG(STRUCT(column2, column3))”

0 投票
2 回答
377 浏览

java - apache.beam.sdk.schemas.Schema.FieldType 中数值的等效数据类型是什么

尝试使用 BeamSQL 将数据写入 BigQuery 表。要写入数据,我们需要该数据的模式。使用org.apache.beam.sdk.schemas来定义数据收集的模式。我们在该数据集合中有数字数据类型列。想知道,org.apache.beam.sdk.schemas.Schema.FieldType类中 Numeric 的等价数据类型是什么。请有人帮我使用数值数据类型的等效模式。

0 投票
1 回答
206 浏览

apache-beam - Beam SQL - SqlValidatorException:找不到对象'PCOLLECTION'

我正在用 Beam SQL 做一些实验。我PCollection<Row>从转换中得到 aSampleSource并将其输出传递给 a SqlTransform

下面的代码运行没有任何错误。

但是,当我尝试以下代码行时,出现运行时错误。

错误详细信息是

请提供一些指示。

0 投票
3 回答
895 浏览

apache-beam - Apache Beam SQLTransform:没有架构时无法调用 getSchema

我正在尝试在PCollection<Object>. 在这里,CustomSource 转换在运行时生成一个 Pojo。因此,在编译时不知道运行 SQLTransform 的对象的类型。

我已经提供了架构SQLTransformsetSchema但我收到了一个错误,即

是否可以在运行时生成 Pojo 对象并通过向转换提供模式信息在它们上运行 sqltransforms ?

这是 CustomSource 类供参考:

0 投票
2 回答
1267 浏览

java - 如何修复 Apache Beam 中的“加入无界 PCollections 目前仅支持带有触发器的非全局窗口”

我正在尝试使用 Apache Beam Java SDK 加入 2 个无限源。加入时我收到以下错误消息。

线程“主”java.lang.UnsupportedOperationException 中的异常:当前仅支持非全局窗口加入无界 PCollections,其触发器已知每个窗口产生一次输出,例如允许延迟为零的默认触发器。在这些情况下,Beam 可以保证它在每个窗口中加入一次所有输入元素。WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute)), 累积模式=DISCARDING_FIRED_PANES, timestampCombiner=EARLIEST org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel 的 org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) 不支持。

我尝试使用固定和滑动窗口以及触发(pastEndOfWindow 和 pastFirstElementInPane),允许延迟为零。尝试了 Accumalate 和 Discard 烧制的窗格。我每次都收到相同的错误消息。

下面是我尝试使用固定和滑动窗口的 2 个片段。

我只是想实现一个带有滑动窗口的 sql 转换,延迟触发并允许延迟。请指导我完成它。

谢谢, 戈瑟姆

0 投票
1 回答
723 浏览

java - 如何在每个窗口上刷新/重新加载侧面输入

我正在使用 Apache Beam 加入多个流以及一些查找。我有 2 个场景,如果查找大小很大,我希望边输入为每个记录处理重新加载/刷新(即我将使用 where 子句查询数据库),如果查找大小较小,则重新加载/刷新一次一天。

我想知道什么是正确的方法。我不希望庞大的数据端输入吃掉所有工人的记忆。

我使用下面的代码每天刷新一次侧输入。

请指导我了解此类用例的最佳实践,并为我提供一些示例代码以便更好地理解。

谢谢, 戈瑟姆

0 投票
1 回答
1270 浏览

java - 如何在滑动窗口中删除重复项 - Apache Beam

我已经实现了一个具有多个无界源和侧输入的数据管道,将数据与滑动窗口(30 秒和每 10 秒)连接起来,并将转换后的输出发送到 Kafka 主题中。我遇到的问题是,在窗口的前 10 秒内收到的数据会发出 3 次(即)每当新窗口启动时触发,直到第一个窗口完成。如何只发出一次转换后的数据或避免重复?

我使用了丢弃触发的窗格,它没有任何区别。每当我尝试将窗口关闭行为设置为 FIRE_ALWAYS/FIRE_IF_NON_EMPTY 时,它都会引发以下错误。

线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:java.lang.IllegalArgumentException:作为单例视图访问的空 PCollection。考虑设置 withDefault 以在 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner. java:302) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) 在 org.apache .beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at y.yyy.main(yyy.java:86) 原因: java.lang.IllegalArgumentException:作为单例视图访问的空 PCollection。

请指导我只触发一次窗口(即)我不想发送已经处理的记录

更新:侧面输入的上述错误经常发生并且不是因为 Windows,这似乎是 Apache Beam 中的一个问题(https://issues.apache.org/jira/browse/BEAM-6086

我尝试使用 State 来识别一行是否已经被处理,但状态没有被保留或被设置。(即)我在阅读状态时总是得到空值。

0 投票
1 回答
1147 浏览

apache-beam - Apache calcite:将整数转换为日期时间

我正在使用 Beam SQL 并尝试将整数转换为日期时间字段。

对于一些计算和窗口,我想转换/转换timestampDatetime字段?请提供一些转换timestamp为. 数据类型。resultSchemaDateTime

0 投票
2 回答
426 浏览

apache-beam - Apache Beam:SQL 聚合不输出无界/有界联接的结果

我正在研究 apache 光束管道以运行 SQL 聚合函数。参考:https ://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache /beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159。此处的示例运行良好。但是,当我将源替换为实际的无界源并进行聚合时,我看不到任何结果。我的管道中的步骤:

  1. 从源中读取有界数据并转换为行集合。
  2. 从 websocket 源读取无限的 json 数据。
  3. 通过 DoFn 为每个源流分配时间戳。
  4. 将无界 json 转换为无界行集合
  5. 在行集合上应用一个窗口
  6. 应用 SQL 语句。
  7. 输出sql的结果。

一个普通的 SQL 语句执行并输出结果。但是,当我在 SQL 中使用 group by 时,没有输出。

结果是连续的,如下所示。

当我将 sql 更改为

在这个实现中我做错了什么吗?任何指针都会很有帮助。

0 投票
1 回答
294 浏览

apache-kafka - apache Beam 中侧输入的替代方法是什么

我正在尝试使用 Apache Beam 加入多个 kafka 流和查找。我使用侧面输入来处理查找表,一切都在直接运行器中解决。但是,当我尝试在 spark 模式或 flink 模式下运行它时,我了解到不支持侧输入。这些是 Jira Bugs 的几个链接。

http://mail-archives.apache.org/mod_mbox/beam-user/201605.mbox/%3C573EFC2F.6000708@nanthrax.net%3E https://issues.apache.org/jira/browse/FLINK-6131

https://issues.apache.org/jira/browse/BEAM-2112

有没有办法使用侧面输入或解决这个问题?

我可以为此使用有状态处理吗?我知道每个窗口每个键的状态都是可用的,但是仍然有调整以不同的方式使用它吗?

我可以使用像 memcached 这样的缓存数据库并在处理每条记录时获取数据吗?

任何建议都受到高度赞赏。

谢谢,