问题标签 [beam-sql]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-dataflow - 使用 BeamSQL 构建嵌套结构
在 BigQuery 中,我们有“ARRAY_AGG”函数,它有助于将普通集合转换为嵌套集合。是否有类似的方法可以使用 BeamSQL 构建相同类型的嵌套结构集合?类似于 BeamSQL 中的以下查询,
“从 PCOLLECTION Group by Column1 中选择 column1, ARRAY_AGG(STRUCT(column2, column3))”
java - apache.beam.sdk.schemas.Schema.FieldType 中数值的等效数据类型是什么
尝试使用 BeamSQL 将数据写入 BigQuery 表。要写入数据,我们需要该数据的模式。使用org.apache.beam.sdk.schemas来定义数据收集的模式。我们在该数据集合中有数字数据类型列。想知道,org.apache.beam.sdk.schemas.Schema.FieldType类中 Numeric 的等价数据类型是什么。请有人帮我使用数值数据类型的等效模式。
apache-beam - Beam SQL - SqlValidatorException:找不到对象'PCOLLECTION'
我正在用 Beam SQL 做一些实验。我PCollection<Row>
从转换中得到 aSampleSource
并将其输出传递给 a SqlTransform
。
下面的代码运行没有任何错误。
但是,当我尝试以下代码行时,出现运行时错误。
错误详细信息是
请提供一些指示。
apache-beam - Apache Beam SQLTransform:没有架构时无法调用 getSchema
我正在尝试在PCollection<Object>
. 在这里,CustomSource 转换在运行时生成一个 Pojo。因此,在编译时不知道运行 SQLTransform 的对象的类型。
我已经提供了架构SQLTransform
,setSchema
但我收到了一个错误,即
是否可以在运行时生成 Pojo 对象并通过向转换提供模式信息在它们上运行 sqltransforms ?
这是 CustomSource 类供参考:
java - 如何修复 Apache Beam 中的“加入无界 PCollections 目前仅支持带有触发器的非全局窗口”
我正在尝试使用 Apache Beam Java SDK 加入 2 个无限源。加入时我收到以下错误消息。
线程“主”java.lang.UnsupportedOperationException 中的异常:当前仅支持非全局窗口加入无界 PCollections,其触发器已知每个窗口产生一次输出,例如允许延迟为零的默认触发器。在这些情况下,Beam 可以保证它在每个窗口中加入一次所有输入元素。WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute)), 累积模式=DISCARDING_FIRED_PANES, timestampCombiner=EARLIEST org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel 的 org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) 不支持。
我尝试使用固定和滑动窗口以及触发(pastEndOfWindow 和 pastFirstElementInPane),允许延迟为零。尝试了 Accumalate 和 Discard 烧制的窗格。我每次都收到相同的错误消息。
下面是我尝试使用固定和滑动窗口的 2 个片段。
我只是想实现一个带有滑动窗口的 sql 转换,延迟触发并允许延迟。请指导我完成它。
谢谢, 戈瑟姆
java - 如何在每个窗口上刷新/重新加载侧面输入
我正在使用 Apache Beam 加入多个流以及一些查找。我有 2 个场景,如果查找大小很大,我希望边输入为每个记录处理重新加载/刷新(即我将使用 where 子句查询数据库),如果查找大小较小,则重新加载/刷新一次一天。
我想知道什么是正确的方法。我不希望庞大的数据端输入吃掉所有工人的记忆。
我使用下面的代码每天刷新一次侧输入。
请指导我了解此类用例的最佳实践,并为我提供一些示例代码以便更好地理解。
谢谢, 戈瑟姆
java - 如何在滑动窗口中删除重复项 - Apache Beam
我已经实现了一个具有多个无界源和侧输入的数据管道,将数据与滑动窗口(30 秒和每 10 秒)连接起来,并将转换后的输出发送到 Kafka 主题中。我遇到的问题是,在窗口的前 10 秒内收到的数据会发出 3 次(即)每当新窗口启动时触发,直到第一个窗口完成。如何只发出一次转换后的数据或避免重复?
我使用了丢弃触发的窗格,它没有任何区别。每当我尝试将窗口关闭行为设置为 FIRE_ALWAYS/FIRE_IF_NON_EMPTY 时,它都会引发以下错误。
线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:java.lang.IllegalArgumentException:作为单例视图访问的空 PCollection。考虑设置 withDefault 以在 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner. java:302) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) 在 org.apache .beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at y.yyy.main(yyy.java:86) 原因: java.lang.IllegalArgumentException:作为单例视图访问的空 PCollection。
请指导我只触发一次窗口(即)我不想发送已经处理的记录
更新:侧面输入的上述错误经常发生并且不是因为 Windows,这似乎是 Apache Beam 中的一个问题(https://issues.apache.org/jira/browse/BEAM-6086)
我尝试使用 State 来识别一行是否已经被处理,但状态没有被保留或被设置。(即)我在阅读状态时总是得到空值。
apache-beam - Apache calcite:将整数转换为日期时间
我正在使用 Beam SQL 并尝试将整数转换为日期时间字段。
对于一些计算和窗口,我想转换/转换timestamp
为Datetime
字段?请提供一些转换timestamp
为. 数据类型。resultSchema
DateTime
apache-beam - Apache Beam:SQL 聚合不输出无界/有界联接的结果
我正在研究 apache 光束管道以运行 SQL 聚合函数。参考:https ://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache /beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159。此处的示例运行良好。但是,当我将源替换为实际的无界源并进行聚合时,我看不到任何结果。我的管道中的步骤:
- 从源中读取有界数据并转换为行集合。
- 从 websocket 源读取无限的 json 数据。
- 通过 DoFn 为每个源流分配时间戳。
- 将无界 json 转换为无界行集合
- 在行集合上应用一个窗口
- 应用 SQL 语句。
- 输出sql的结果。
一个普通的 SQL 语句执行并输出结果。但是,当我在 SQL 中使用 group by 时,没有输出。
结果是连续的,如下所示。
当我将 sql 更改为
在这个实现中我做错了什么吗?任何指针都会很有帮助。
apache-kafka - apache Beam 中侧输入的替代方法是什么
我正在尝试使用 Apache Beam 加入多个 kafka 流和查找。我使用侧面输入来处理查找表,一切都在直接运行器中解决。但是,当我尝试在 spark 模式或 flink 模式下运行它时,我了解到不支持侧输入。这些是 Jira Bugs 的几个链接。
http://mail-archives.apache.org/mod_mbox/beam-user/201605.mbox/%3C573EFC2F.6000708@nanthrax.net%3E https://issues.apache.org/jira/browse/FLINK-6131
有没有办法使用侧面输入或解决这个问题?
我可以为此使用有状态处理吗?我知道每个窗口每个键的状态都是可用的,但是仍然有调整以不同的方式使用它吗?
我可以使用像 memcached 这样的缓存数据库并在处理每条记录时获取数据吗?
任何建议都受到高度赞赏。
谢谢,