0

我在 Flink 上使用 Kinesis Data Analytics 进行流处理。
我正在处理的用例是从单个 Kinesis 流中读取记录,并在进行一些转换后写入多个 S3 存储桶。一个源记录可能最终位于多个 S3 存储桶中。我们需要写入多个存储桶,因为源记录包含很多需要拆分到多个 S3 存储桶的信息。

我尝试使用多个接收器来实现这一点。

private static <T> SinkFunction<T> createS3SinkFromStaticConfig(String path, Class<T> type) {
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartSuffix(".snappy.parquet")
                .build();


        final StreamingFileSink<T> sink = StreamingFileSink
                .forBulkFormat(new Path(s3SinkPath + "/" + path), createParquetWriter(type))
                .withBucketAssigner(new S3BucketAssigner<T>())
                .withOutputFileConfig(config)
                .withRollingPolicy(new RollingPolicy<T>(DEFAULT_MAX_PART_SIZE, DEFAULT_ROLLOVER_INTERVAL))
                .build();
        return sink;
}

public static void main(String[] args) throws Exception {
    DataStream<PIData> input = createSourceFromStaticConfig(env)
        .map(new JsonToSourceDataMap())
        .name("jsonToInputDataTransformation");


    input.map(value -> value)
        .name("rawData")
        .addSink(createS3SinkFromStaticConfig("raw_data", InputData.class))
        .name("s3Sink");

     input.map(FirstConverter::convertInputData)
        .addSink(createS3SinkFromStaticConfig("firstOutput", Output1.class));

    input.map(SecondConverter::convertInputData)
        .addSink(createS3SinkFromStaticConfig("secondOutput", Output2.class));

    input.map(ThirdConverter::convertInputData)
        .addSink(createS3SinkFromStaticConfig("thirdOutput", Output3.class));

    //and so on; There are around 10 buckets.
}

但是,由于这个原因,我看到了很大的性能影响。由于这个原因,我看到了一个很大的 CPU 峰值(与只有一个接收器的情况相比)。我正在查看的规模约为每秒 10 万条记录。

其他注意事项:我正在使用批量格式编写器,因为我想以镶木地板格式编写文件。我尝试将检查点间隔从 1 分钟增加到 3 分钟,假设每分钟将文件写入 s3 可能会导致问题。但这并没有太大帮助。

由于我是 flink 和流处理的新手,我不确定是否可以预期会有这么大的性能影响,或者有什么我可以做得更好的吗?使用平面图运算符然后使用单个接收器会更好吗?

4

1 回答 1

0

当你有一个非常简单的管道时,只有一个源和一个接收器,就像这样:

source -> map -> sink

然后 Flink 调度程序能够优化执行,整个管道作为单个任务中的一系列函数调用运行——没有序列化或网络开销。Flink 1.12 可以将此运算符链接优化应用于更复杂的拓扑——可能包括您现在拥有的具有多个接收器的拓扑——但我不相信这在 Flink 1.11 中是可能的(这是 KDA 当前所基于的)。

我看不出使用平面地图会有什么不同。

您可能可以优化您的序列化/反序列化。请参阅https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

于 2021-01-29T13:21:15.647 回答