0

我将以下数据作为流提供给 Flink

ID  Val eventTime.rowtime
266 25  9000
266 22  10000
266 19  11000
266 18  12000
266 16  13000
266 15  14000
266 14  15000
266 13  16000
266 14  17000
266 15  18000
266 17  19000
266 18  20000
266 18  21000
266 19  22000
266 21  23000
266 21  24000
266 21  25000
266 22  26000
266 21  27000
266 21  28000
266 22  29000
266 24  30000
266 23  31000
266 24  32000
266 25  33000
266 24  34000
266 22  35000
266 23  36000
266 24  37000
266 19  38000

我需要运行 SQL 匹配识别如下

Select ID, sts, ets, intervalValue,valueDescription, intvDuration from         
RawEvents Match_Recognize (
PARTITION BY ID
ORDER BY eventTime
MEASURES
A.ID AS id,
FIRST(A.eventTime) As sts,
LAST(A.eventTime) As ets,
MAX(A.val) As intervalValue,
'max' As valueDescription,
TIMESTAMPDIFF(SECOND, FIRST(A.eventTime), LAST(A.eventTime)) As 
intvDuration 
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A as A.val>=20,
B As true)

我希望输出包括像这样的间隔

(266,1970-01-01 00:00:09.0,1970-01-01 00:00:10.0,25.0,max,1)
(266,1970-01-01 00:00:10.0,1970-01-01 00:00:10.0,22.0,max,0)
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:23.0,22.0,max,0)
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:24.0,22.0,max,0)
...
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:37.0,22.0,max,0)
...
(266,1970-01-01 00:00:37.0,1970-01-01 00:00:37.0,22.0,max,0)

但我实际上得到的是前两个只记录

下面是我将流转换为表并将查询结果返回到流的完整代码

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(10);


DataStream<String> stringStream = env.addSource(new 
LinearRoadSource("C:\\Work\\Data\\linear.csv"));
    DataStream<SpeedEvent> speedStream = stringStream.map(new 
SpeedMapper()).setParallelism(1);
    speedStream = speedStream.assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks<SpeedEvent>() {
        private long maxTimestampSeen = 0;

        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(maxTimestampSeen);
        }

        @Override
        public long extractTimestamp(SpeedEvent temperatureEvent, long l) 
{
            long ts = temperatureEvent.getTimestamp();
           // if (temperatureEvent.getKey().equals("W"))
                maxTimestampSeen = Long.max(maxTimestampSeen,ts);
            return ts;
        }
    }).setParallelism(1);

TupleTypeInfo<Tuple3<String, Double, Long>> inputTupleInfo = new 
TupleTypeInfo<>(
            Types.STRING(),
            Types.DOUBLE(),
            Types.LONG()
    );


StreamTableEnvironment tableEnv = 
StreamTableEnvironment.getTableEnvironment(env);
    tableEnv.registerDataStream("RawEvents",
            keyedStream.map((MapFunction<SpeedEvent, Tuple3<String, 
Double, Long>>) event -> new Tuple3<>(event.getKey(), event.getValue(), 
event.getTimestamp())).returns(inputTupleInfo),
            "ID, val, eventTime.rowtime"
    );

Table intervalResult = tableEnv.sqlQuery("Select ID, sts, ets, intervalValue,valueDescription, intvDuration from         
RawEvents Match_Recognize (
PARTITION BY ID
ORDER BY eventTime
MEASURES
A.ID AS id,
FIRST(A.eventTime) As sts,
LAST(A.eventTime) As ets,
MAX(A.val) As intervalValue,
'max' As valueDescription,
TIMESTAMPDIFF(SECOND, FIRST(A.eventTime), LAST(A.eventTime)) As 
intvDuration 
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A as A.val>=20,
B As true)");

TupleTypeInfo<Tuple6<String, Timestamp, Timestamp, Double, String, 
Integer>> tupleTypeInterval = new TupleTypeInfo<>(
            Types.STRING(),
            Types.SQL_TIMESTAMP(),
            Types.SQL_TIMESTAMP(),
            Types.DOUBLE(),
            Types.STRING(),
            Types.INT()
    );

DataStream<Tuple6<String, Timestamp, Timestamp, Double, String, Integer>> 
queryResultAsStream = tableEnv.toAppendStream(intervalResult,    tupleTypeInterval);
queryResultAsStream.print();

会不会有什么我做错了或者我忘了做的事情?

我正在使用 Flink 1.8.1。

4

0 回答 0