我的要求是围绕flink中的sql查询结果处理或构建一些逻辑。为简单起见,假设我有两个 sql 查询,它们在不同的窗口大小和一个事件流上运行。我的问题是
- a)我怎么知道这是哪个查询结果
- b)我怎么知道执行查询的结果有多少行?我需要此信息,因为我必须构建带有事件列表的通知消息,这些事件列表是查询结果的一部分。
DataStream<Event> ds = ...
String query = "select id, key" +
" from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ";
String query1 = "select id, key" +
" from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '1' DAY), id, key ";
List<String> list = new ArrayList<>();
list.add(query);
list.add(query1);
tabEnv.createTemporaryView("eventTable", ds, $("id"), $("timeLong"), $("key"),$("rowTime").rowtime());
for(int i =0; i< list.size(); i++ ){
Table result = tabEnv.sqlQuery(list.get(i));
DataStream<Tuple2<Boolean, Row>> dsRow = tabEnv.toRetractStream(result, Row.class);
dsRow.process(new ProcessFunction<Tuple2<Boolean, Row>, Object>() {
List<Row> listRow = new ArrayList<>();
@Override
public void processElement(Tuple2<Boolean, Row> booleanRowTuple2, Context context, Collector<Object> collector) throws Exception {
listRow.add(booleanRowTuple2.f1);
}
});
}
感谢你的帮助。谢谢阿舒托什