我的用例 - 收集特定持续时间的事件,然后根据键对它们进行分组
Objective 处理后,用户可以根据key保存特定时长的数据
我打算怎么做 1)从 Kafka 接收事件
2)创建事件数据流
3)通过运行 SQL 查询将表与其关联并收集特定持续时间的数据
4)将新表与第二步输出相关联,并根据键对收集的数据进行分组
5)将数据保存在数据库中
我试过的解决方案-
我能够-
1)从卡夫卡接收事件,
2)设置数据流(比如说sensorDataStream)-
DataStream<SensorEvent> sensorDataStream
= source.flatMap(new FlatMapFunction<String, SensorEvent>() {
@Override
public void flatMap(String catalog, Collector<SensorEvent> out) {
// create SensorEvent(id, sensor notification value, notification time) creation
});
3)将表(比如说table1)与数据流相关联,并在运行SQL查询之后 -
SELECT id, sensorNotif, notifTime FROM SENSORTABLE WHERE notifTime > t1_Timestamp AND notifTime < t2_Timestamp.
这里 t1_Timestamp 和 t2_Timestamp 是预定义的纪元时间,会根据一些预定义的条件而改变
4)我可以通过在控制台上使用以下查询来打印此 sql 查询结果-
tableEnv.toAppendStream(table1, Row.class).print();
5)使用table1和以下类型的sql查询创建了一个新表(比如说table2)-
Table table2 = tableEnv.sqlQuery("SELECT id AS SensorID, COUNT(sensorNotif) AS SensorNotificationCount FROM table1 GROUP BY id);
6)通过使用收集和打印数据 -
tableEnv.toRetractStream(table2 , Row.class).print();
问题
1)我无法在控制台上看到第 6 步的输出。
我做了一些实验,发现如果我跳过 table1 设置步骤(这意味着一段时间内没有传感器数据俱乐部)并将我的 senserDataStream 与 table2 直接关联,那么我可以看到第 6 步的输出,但因为这是 RetractStream 所以我可以看到形式的数据,如果新事件即将到来,则此撤回流将使数据无效并打印新计算的数据。
我想要的建议
1)如何合并步骤 5 和步骤 6(表示表 1 和表 2)。我已经合并了这些表,但是由于控制台上看不到数据,所以我有疑问?难道我做错了什么?还是数据已合并但不可见?
2)我的计划是——
2.a) 在 2 遍中过滤数据,在第一遍中过滤特定时间间隔的数据,在第二遍中将数据分组
2.b)在数据库中保存 2.a 输出 这种方法是否有效(我有疑问,因为我正在使用数据流并且 table1 输出是附加流但 table2 输出是撤回流)?