首先也是最重要的:
- 我是 Flink 的新手(了解原理并能够创建我需要的任何基本流式作业)
- 我正在使用 Kinesis Analytics 运行我的 Flink 作业,默认情况下它使用间隔为 1 分钟的增量检查点。
- Flink 作业正在使用 FlinkKinesisConsumer 和自定义 deserailzer 从 Kinesis 流中读取事件(将字节反序列化为在整个作业中使用的简单 Java 对象)
我想要归档的只是计算过去 24 小时内有多少 ENTITY_ID/FOO 和 ENTITY_ID/BAR 事件。重要的是这个计数尽可能准确,这就是为什么我使用这个 Flink 功能而不是自己在 5 分钟的滚动窗口上做一个运行总和。我还希望能够从一开始就计算“TOTAL”事件(而不仅仅是过去 24 小时),所以我还在结果中输出过去 5 分钟的事件计数,以便后期处理应用程序可以只需将这 5 分钟的数据进行汇总即可。(这个计数不一定要准确,如果出现中断也没关系,我会丢失一些计数)
现在,这项工作一直运行良好,直到上周我们的流量激增(10 倍以上)。从那时起,Flink 就变成了香蕉。检查点大小开始从约 500MB 缓慢增长到 20GB,检查点时间大约需要 1 分钟,并且随着时间的推移而增长。应用程序开始失败并且永远无法完全恢复,并且事件迭代器的年龄增长从未回落,因此没有新的事件被消耗。
由于我是 Flink 的新手,我不确定我进行滑动计数的方式是完全未优化还是完全错误。
这是代码关键部分的一小部分:
源(MyJsonDeserializationSchema 扩展 AbstractDeserializationSchema 并简单地读取字节并创建事件对象):
SourceFunction<Event> source =
new FlinkKinesisConsumer<>("input-kinesis-stream", new MyJsonDeserializationSchema(), kinesisConsumerConfig);
输入事件,简单的 java pojo,将在 Flink 操作符中使用:
public class Event implements Serializable {
public String entityId;
public String entityType;
public String entityName;
public long eventTimestamp = System.currentTimeMillis();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> eventsStream = kinesis
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
@Override
public long extractTimestamp(Event event) {
return event.eventTimestamp;
}
})
DataStream<Event> fooStream = eventsStream
.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return "foo".equalsIgnoreCase(event.entityType);
}
})
DataStream<Event> barStream = eventsStream
.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return "bar".equalsIgnoreCase(event.entityType);
}
})
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table fooTable = tEnv.fromDataStream("fooStream, entityId, entityName, entityType, eventTimestame.rowtime");
tEnv.registerTable("Foo", fooTable);
Table barTable = tEnv.fromDataStream("barStream, entityId, entityName, entityType, eventTimestame.rowtime");
tEnv.registerTable("Bar", barTable);
Table slidingFooCountTable = fooTable
.window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
.groupBy("entityId, entityName, minuteWindow")
.select("concat(concat(entityId,'_'), entityName) as slidingFooId, entityid as slidingFooEntityid, entityName as slidingFooEntityName, entityType.count as slidingFooCount, minuteWindow.rowtime as slidingFooMinute");
Table slidingBarCountTable = barTable
.window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
.groupBy("entityId, entityName, minuteWindow")
.select("concat(concat(entityId,'_'), entityName) as slidingBarId, entityid as slidingBarEntityid, entityName as slidingBarEntityName, entityType.count as slidingBarCount, minuteWindow.rowtime as slidingBarMinute");
Table tumblingFooCountTable = fooTable
.window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
.groupBy("entityid, entityName, minuteWindow")
.select("concat(concat(entityName,'_'), entityName) as tumblingFooId, entityId as tumblingFooEntityId, entityNamae as tumblingFooEntityName, entityType.count as tumblingFooCount, minuteWindow.rowtime as tumblingFooMinute");
Table tumblingBarCountTable = barTable
.window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
.groupBy("entityid, entityName, minuteWindow")
.select("concat(concat(entityName,'_'), entityName) as tumblingBarId, entityId as tumblingBarEntityId, entityNamae as tumblingBarEntityName, entityType.count as tumblingBarCount, minuteWindow.rowtime as tumblingBarMinute");
Table aggregatedTable = slidingFooCountTable
.leftOuterJoin(slidingBarCountTable, "slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
.leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
.leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
.select("slidingFooMinute as timestamp, slidingFooCreativeId as entityId, slidingFooEntityName as entityName, slidingFooCount, slidingBarCount, tumblingFooCount, tumblingBarCount");
DataStream<Result> result = tEnv.toAppendStream(aggregatedTable, Result.class);
result.addSink(sink); // write to an output stream to be picked up by a lambda function
如果有更多使用 Flink 工作经验的人可以评论我的计数方式,我将不胜感激?我的代码是否完全过度设计?是否有更好、更有效的方法来计算 24 小时内的事件?
我在 Stackoverflow @DavidAnderson 的某处读过,建议使用地图状态创建我们自己的滑动窗口并按时间戳对事件进行切片。但是我不确定这是什么意思,也没有找到任何代码示例来展示它。