我有以下简单的代码:
private int i = 0;
@StreamListener(Sink.INPUT)
public void processMessage(Message<?> message) {
i++;
}
@Scheduled(fixedDelay=5000)
private void scheduled(){
LOG.info("Messages consumed: " + i);
}
和以下属性:
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
spring.cloud.stream.bindings.input.destination=test6
spring.cloud.stream.bindings.input.group=testGroup50
spring.cloud.stream.bindings.input.partitioned=false
我有一个带有96 k 消息的单个分区的本地 kafka 主题。kafka 库提供的简单 kafka 消费者在大约4 秒内使用这些消息。
但是,上面的代码需要将近1 分钟!
显然,这是我们的应用程序所关心的问题,以前有人经历过吗?我在这里错过了什么吗?
Visual VM 也没有标记任何内容。
PS:我刚刚尝试了自动提交,但我仍然看到了糟糕的表现。