2

我正在研究 apache 光束管道以运行 SQL 聚合函数。参考:https ://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache /beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159。此处的示例运行良好。但是,当我将源替换为实际的无界源并进行聚合时,我看不到任何结果。我的管道中的步骤:

  1. 从源中读取有界数据并转换为行集合。
  2. 从 websocket 源读取无限的 json 数据。
  3. 通过 DoFn 为每个源流分配时间戳。
  4. 将无界 json 转换为无界行集合
  5. 在行集合上应用一个窗口
  6. 应用 SQL 语句。
  7. 输出sql的结果。

一个普通的 SQL 语句执行并输出结果。但是,当我在 SQL 中使用 group by 时,没有输出。

SELECT 
  o1.detectedCount,
  o1.sensor se,
  o2.sensor sa
FROM SENSOR o1 
  LEFT JOIN AREA o2 
  on o1.sensor = o2.sensor

结果是连续的,如下所示。

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":0,
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":1,
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":0,            
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

当我将 sql 更改为

SELECT
  COUNT(o1.detectedCount) o2.sensor sa
FROM SENSOR o1
  LEFT JOIN AREA o2
  on o1.sensor = o2.sensor
GROUP BY o2.sensor

在这个实现中我做错了什么吗?任何指针都会很有帮助。

4

2 回答 2

0
SELECT
  COUNT(o1.detectedCount) as number
 ,o2.sensor
,sa
FROM SENSOR o1
  LEFT OUTER JOIN AREA o2
  on o1.sensor = o2.sensor
GROUP BY sa,o1.sensor,o2.sensor
于 2019-07-30T01:10:39.157 回答
0

阅读代码时会出现一些建议:

  1. 扩展窗口,允许延迟,并发出提前到达的数据。
 .apply("windowing", Window.<Row>into(FixedWindows.of(Duration.standardSeconds(2)))
                            .triggering(AfterWatermark.pastEndOfWindow()
                                    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                            .plusDelayOf(Duration.standardSeconds(1)))
                                    .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                                            .plusDelayOf(Duration.standardSeconds(2))))
                            .withAllowedLateness(Duration.standardMinutes(10))
                            .discardingFiredPanes());

  1. 尝试删除join并检查如果没有它,您是否有输出到窗口,

  2. 尝试将更多时间添加到窗口中。因为有时在工作人员之间洗牌数据太短了。并且连接的流不会同时发出。

  3. outputWithTimestamp将以不同的时间戳输出行,然后当您不允许迟到时可以删除它们。阅读文档 outputWithTimestamp,这个 API 有点冒险。

如果输入 {@link PCollection} 元素具有时间戳,则每个元素的输出时间戳不得早于输入元素的时间戳减去 {@link getAllowedTimestampSkew()} 的值。如果输出时间戳在此时间之前,则转换将在执行时抛出 {@link IllegalArgumentException}。使用 {@link withAllowedTimestampSkew(Duration)} 更新允许的偏差。

注意:使用 {@link #withAllowedTimestampSkew(Duration)} 允许在水印后面发出元素。这些元素被认为是迟到的,如果在下游 {@link PCollection} 的 {@link Window#withAllowedLateness(Duration) allowed lateness} 之后,可能会被静默丢弃。

于 2019-07-23T08:18:54.157 回答