我将以下数据作为流提供给 Flink
ID Val eventTime.rowtime
266 25 9000
266 22 10000
266 19 11000
266 18 12000
266 16 13000
266 15 14000
266 14 15000
266 13 16000
266 14 17000
266 15 18000
266 17 19000
266 18 20000
266 18 21000
266 19 22000
266 21 23000
266 21 24000
266 21 25000
266 22 26000
266 21 27000
266 21 28000
266 22 29000
266 24 30000
266 23 31000
266 24 32000
266 25 33000
266 24 34000
266 22 35000
266 23 36000
266 24 37000
266 19 38000
我需要运行 SQL 匹配识别如下
Select ID, sts, ets, intervalValue,valueDescription, intvDuration from
RawEvents Match_Recognize (
PARTITION BY ID
ORDER BY eventTime
MEASURES
A.ID AS id,
FIRST(A.eventTime) As sts,
LAST(A.eventTime) As ets,
MAX(A.val) As intervalValue,
'max' As valueDescription,
TIMESTAMPDIFF(SECOND, FIRST(A.eventTime), LAST(A.eventTime)) As
intvDuration
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A as A.val>=20,
B As true)
我希望输出包括像这样的间隔
(266,1970-01-01 00:00:09.0,1970-01-01 00:00:10.0,25.0,max,1)
(266,1970-01-01 00:00:10.0,1970-01-01 00:00:10.0,22.0,max,0)
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:23.0,22.0,max,0)
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:24.0,22.0,max,0)
...
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:37.0,22.0,max,0)
...
(266,1970-01-01 00:00:37.0,1970-01-01 00:00:37.0,22.0,max,0)
但我实际上得到的是前两个只记录
下面是我将流转换为表并将查询结果返回到流的完整代码
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(10);
DataStream<String> stringStream = env.addSource(new
LinearRoadSource("C:\\Work\\Data\\linear.csv"));
DataStream<SpeedEvent> speedStream = stringStream.map(new
SpeedMapper()).setParallelism(1);
speedStream = speedStream.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<SpeedEvent>() {
private long maxTimestampSeen = 0;
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTimestampSeen);
}
@Override
public long extractTimestamp(SpeedEvent temperatureEvent, long l)
{
long ts = temperatureEvent.getTimestamp();
// if (temperatureEvent.getKey().equals("W"))
maxTimestampSeen = Long.max(maxTimestampSeen,ts);
return ts;
}
}).setParallelism(1);
TupleTypeInfo<Tuple3<String, Double, Long>> inputTupleInfo = new
TupleTypeInfo<>(
Types.STRING(),
Types.DOUBLE(),
Types.LONG()
);
StreamTableEnvironment tableEnv =
StreamTableEnvironment.getTableEnvironment(env);
tableEnv.registerDataStream("RawEvents",
keyedStream.map((MapFunction<SpeedEvent, Tuple3<String,
Double, Long>>) event -> new Tuple3<>(event.getKey(), event.getValue(),
event.getTimestamp())).returns(inputTupleInfo),
"ID, val, eventTime.rowtime"
);
Table intervalResult = tableEnv.sqlQuery("Select ID, sts, ets, intervalValue,valueDescription, intvDuration from
RawEvents Match_Recognize (
PARTITION BY ID
ORDER BY eventTime
MEASURES
A.ID AS id,
FIRST(A.eventTime) As sts,
LAST(A.eventTime) As ets,
MAX(A.val) As intervalValue,
'max' As valueDescription,
TIMESTAMPDIFF(SECOND, FIRST(A.eventTime), LAST(A.eventTime)) As
intvDuration
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A as A.val>=20,
B As true)");
TupleTypeInfo<Tuple6<String, Timestamp, Timestamp, Double, String,
Integer>> tupleTypeInterval = new TupleTypeInfo<>(
Types.STRING(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.DOUBLE(),
Types.STRING(),
Types.INT()
);
DataStream<Tuple6<String, Timestamp, Timestamp, Double, String, Integer>>
queryResultAsStream = tableEnv.toAppendStream(intervalResult, tupleTypeInterval);
queryResultAsStream.print();
会不会有什么我做错了或者我忘了做的事情?
我正在使用 Flink 1.8.1。