0

我正在使用来自 Kafka 源的流来完成我的 flink 工作,一次读取 50 个主题,如下所示:

FlinkKafkaConsumer<GenericRecord> kafkaConsumer = new FlinkKafkaConsumer<GenericRecord>(
            Pattern.compile("TOPIC_NAME[1-50].stream"), // getting data stream from all topics
            <DeserializationSchema>, //using avro schema
            properties); // auto.commit.interval.ms=1000 ...

然后有一些操作符,例如:filter->map->keyBy->window->aggreagate->sink

我能够获得的最大吞吐量是每秒 10k 到 20k 条记录,考虑到源发布了数十万个事件,我可以清楚地看到消费者落后于生产者,这相当低。我什至尝试移除水槽和其他操作员以确保没有背压​​,但它仍然是一样的。我正在将我的应用程序部署到 Amazon Kinesis 数据分析中,并尝试了几种并行设置,但这些设置似乎都没有提高吞吐量。

有什么我想念的吗?

4

1 回答 1

0

有几件事会显着影响吞吐量。

低效的序列化通常是导致吞吐量不佳的主要因素。请参阅Flink 序列化调优卷。1:选择你的序列化器——如果你能深入了解这个话题。Avro 通用记录序列化器还不错,但是您是否携带实际上不需要的数据?

您是否在管道中的任何地方更改并行度?这太贵了。

使用 Kinesis 数据分析,您必须使用 RocksDB 状态后端,其吞吐量远低于基于堆的状态后端。但是拥有正确的配置会有所帮助。您应该为 RocksDB 工作目录使用最快的可用本地磁盘(SSD,或者在极端情况下,可能需要 RAM 磁盘)。确保您的实例类型提供足够的 IOP。给 RocksDB 足够的内存。如果您进行大量读取,则值得启用布隆过滤器。请参阅Flink 中的磁盘对 RocksDB 状态后端的影响:案例研究,了解有关使用 RocksDB 的更多信息。

您可以尝试禁用检查点作为实验。如果这有帮助,那将提供一些线索。

某些网络设置会影响吞吐量。默认值通常提供不错的性能,但如果您修改了它们,则值得研究。

于 2021-04-10T18:44:40.257 回答