我们正在使用具有以下配置的消费者 kafka 客户端 0.10.2.0:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
如您所见,我们正在使用自动提交。我们使用的消费者 API 版本有一个专门的线程来执行自动提交。所以每一秒我们都有一个自动提交,这意味着我们每一秒都有一个心跳。
我们的应用程序处理时间实际上可能需要(有时)超过 40 秒(请求超时间隔)
我想问的是:
1 - 如果处理时间需要,例如,一分钟。尽管每秒都有自动提交heartbean,但是否会重新平衡?
2 - 更奇怪的是,在长时间执行的情况下,我们似乎不止一次收到相同的消息。正常吗?如果消费者已经提交了偏移量,为什么重新平衡会再次使用相同的偏移量?
谢谢, 奥雷尔