0

我使用 Kinesis 数据流作为源,使用 elasticsearch 作为接收器。在 AWS Kinesis Data 分析应用程序中运行 Flink 作业。

示例事件:

{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5} 

我正在从前端收集这些视频观看事件,而视频每 5 秒为一位用户播放一次。这些事件用于计算用户的观看时间。

假设如果一个用户正在观看视频,则每 5 秒从前端生成一次此事件,并将其摄取到 Kinesis 数据流中。因此,有 10,000 个用户观看视频,因此在一分钟内总共会生成 120,000 个事件。

为了处理120,000 个事件,我的 Flink 作业几乎需要大约 4 分钟的时间。这是相当长的一段时间。

那么如何才能提高工作的绩效呢?我需要在1 分钟内实现这一目标。

我的工作是这样的:

        stream
                .keyBy(e -> e.getUserId())
                .timeWindow(Time.seconds(60))
                .reduce(new MyReduceFunction()) //sum of video duration for user
                .map(<enrich event using some data from redis>)
                .addSink(<elasticsearch sink>);

// Reduce function 

 private static class MyReduceFunction implements ReduceFunction<TrackingData> {
        @Override
        public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
                trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
                return trackingData;
        }
    }

所以这项工作是做什么的,首先从 Kinesis Data 流接收事件,然后我通过这个流键入,userId然后我做一些videoDuration1 分钟,然后这些数据进入丰富功能,我从 Redis 读取一些数据并丰富这个事件,然后我下沉这个事件到elasticsearch。

我已经尝试增加工作的并行度,它为 1 个并行度提供了最佳性能,大约 4 分钟。如果我增加并行度,它会花费更多时间,这很奇怪。尝试使用 2、4、8、16 等。增加并行度应该可以加快处理速度,不是吗?

谁能帮助我在 Flink 工作中缺少什么或我做错了什么,我需要做什么才能在 1 分钟内加快这些事件的速度?

4

0 回答 0