我正在使用来自 Kafka 源的流来完成我的 flink 工作,一次读取 50 个主题,如下所示:
FlinkKafkaConsumer<GenericRecord> kafkaConsumer = new FlinkKafkaConsumer<GenericRecord>(
Pattern.compile("TOPIC_NAME[1-50].stream"), // getting data stream from all topics
<DeserializationSchema>, //using avro schema
properties); // auto.commit.interval.ms=1000 ...
然后有一些操作符,例如:filter->map->keyBy->window->aggreagate->sink
我能够获得的最大吞吐量是每秒 10k 到 20k 条记录,考虑到源发布了数十万个事件,我可以清楚地看到消费者落后于生产者,这相当低。我什至尝试移除水槽和其他操作员以确保没有背压,但它仍然是一样的。我正在将我的应用程序部署到 Amazon Kinesis 数据分析中,并尝试了几种并行设置,但这些设置似乎都没有提高吞吐量。
有什么我想念的吗?