我正在研究 apache 光束管道以运行 SQL 聚合函数。参考:https ://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache /beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159。此处的示例运行良好。但是,当我将源替换为实际的无界源并进行聚合时,我看不到任何结果。我的管道中的步骤:
- 从源中读取有界数据并转换为行集合。
- 从 websocket 源读取无限的 json 数据。
- 通过 DoFn 为每个源流分配时间戳。
- 将无界 json 转换为无界行集合
- 在行集合上应用一个窗口
- 应用 SQL 语句。
- 输出sql的结果。
一个普通的 SQL 语句执行并输出结果。但是,当我在 SQL 中使用 group by 时,没有输出。
SELECT
o1.detectedCount,
o1.sensor se,
o2.sensor sa
FROM SENSOR o1
LEFT JOIN AREA o2
on o1.sensor = o2.sensor
结果是连续的,如下所示。
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":0,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":1,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":0,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
当我将 sql 更改为
SELECT
COUNT(o1.detectedCount) o2.sensor sa
FROM SENSOR o1
LEFT JOIN AREA o2
on o1.sensor = o2.sensor
GROUP BY o2.sensor
在这个实现中我做错了什么吗?任何指针都会很有帮助。