我尝试使用 2 种方法从 Confluent Cloud 集群中消费消息,并且两者的总 poll() 时间几乎相同 -
单线程 - 仅使用一个使用者顺序读取所有 TopicPartition
多线程 - 生成多个消费者(等于 TopicPartitions 的数量),手动将每个分区分配给单独的消费者(使用 #assign())。并行运行这些线程并执行#poll()。消息的处理也将由线程本身完成。
第二种方法的速度有所提高,但这主要是由于并行发生的处理部分。poll() 方法(获取所有 ConsumerRecords,不包括处理)所花费的时间在这两种情况下几乎相同。
我的问题-KafkaConsumer 中的 poll() 方法是否以某种方式在服务器端阻塞?多个消费者并行轮询与单个消费者顺序轮询的轮询时间几乎相似。唯一的性能提升似乎是由于使用 poll() 获取所有 ConsumerRecords 之后发生的处理部分。
注意:我在这里没有使用消费者组功能,因为它不适合我们的用例。如前所述,我正在手动分配 TopicPartitions。