0

我的要求是围绕flink中的sql查询结果处理或构建一些逻辑。为简单起见,假设我有两个 sql 查询,它们在不同的窗口大小和一个事件流上运行。我的问题是

  • a)我怎么知道这是哪个查询结果
  • b)我怎么知道执行查询的结果有多少行?我需要此信息,因为我必须构建带有事件列表的通知消息,这些事件列表是查询结果的一部分。
DataStream<Event> ds = ...        
String query = "select id, key" +
                "  from  eventTable  GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ";

        String query1 = "select id, key" +
                "  from  eventTable  GROUP BY TUMBLE(rowTime, INTERVAL '1' DAY), id, key ";
        List<String> list = new ArrayList<>();
        list.add(query);
        list.add(query1);
       
        tabEnv.createTemporaryView("eventTable", ds, $("id"), $("timeLong"), $("key"),$("rowTime").rowtime());


        for(int i =0; i< list.size(); i++ ){
            Table result = tabEnv.sqlQuery(list.get(i));
            DataStream<Tuple2<Boolean, Row>> dsRow = tabEnv.toRetractStream(result, Row.class);
            dsRow.process(new ProcessFunction<Tuple2<Boolean, Row>, Object>() {

            List<Row> listRow = new ArrayList<>();
            @Override
            public void processElement(Tuple2<Boolean, Row> booleanRowTuple2, Context context, Collector<Object> collector) throws Exception {
                listRow.add(booleanRowTuple2.f1);
            }
            });
        }

感谢你的帮助。谢谢阿舒托什

4

1 回答 1

1

要区分哪些结果来自哪个查询,您可以在查询本身中包含每个查询的标识符,例如,

SELECT '10sec', id, key FROM eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key

确定结果表中的行数比较棘手。一个问题是流查询的结果数量没有最终答案。但是在您处理结果的地方,您似乎可以计算行数。

或者,我还没有尝试过,但也许你可以使用类似row_number() over(order by tumble_rowtime(rowTime, interval '10' second))的方法来用计数器注释结果的每一行。

于 2020-08-05T09:09:08.357 回答