我使用 Kinesis 数据流作为源,使用 elasticsearch 作为接收器。在 AWS Kinesis Data 分析应用程序中运行 Flink 作业。
示例事件:
{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5}
我正在从前端收集这些视频观看事件,而视频每 5 秒为一位用户播放一次。这些事件用于计算用户的观看时间。
假设如果一个用户正在观看视频,则每 5 秒从前端生成一次此事件,并将其摄取到 Kinesis 数据流中。因此,有 10,000 个用户观看视频,因此在一分钟内总共会生成 120,000 个事件。
为了处理120,000 个事件,我的 Flink 作业几乎需要大约 4 分钟的时间。这是相当长的一段时间。
那么如何才能提高工作的绩效呢?我需要在1 分钟内实现这一目标。
我的工作是这样的:
stream
.keyBy(e -> e.getUserId())
.timeWindow(Time.seconds(60))
.reduce(new MyReduceFunction()) //sum of video duration for user
.map(<enrich event using some data from redis>)
.addSink(<elasticsearch sink>);
// Reduce function
private static class MyReduceFunction implements ReduceFunction<TrackingData> {
@Override
public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
return trackingData;
}
}
所以这项工作是做什么的,首先从 Kinesis Data 流接收事件,然后我通过这个流键入,userId
然后我做一些videoDuration
1 分钟,然后这些数据进入丰富功能,我从 Redis 读取一些数据并丰富这个事件,然后我下沉这个事件到elasticsearch。
我已经尝试增加工作的并行度,它为 1 个并行度提供了最佳性能,大约 4 分钟。如果我增加并行度,它会花费更多时间,这很奇怪。尝试使用 2、4、8、16 等。增加并行度应该可以加快处理速度,不是吗?
谁能帮助我在 Flink 工作中缺少什么或我做错了什么,我需要做什么才能在 1 分钟内加快这些事件的速度?