我在使用 KafkaConsumer.poll(duration timeout) 时遇到了困难,其中它无限期地运行并且永远不会退出该方法。了解这可能与连接有关,有时我看到它有点不一致。如果poll停止响应,我该如何处理?下面给出的是来自KafkaConsumer.poll()的片段
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
我从这里调用上述内容:
Duration timeout = Duration.ofSeconds(30);
while (true) {
final ConsumerRecords<recordID, topicName> records = consumer.poll(timeout);
System.out.println("record count is" + records.count());
}
我收到以下错误:
org.apache.kafka.common.errors.SerializationException:在偏移量 2 处反序列化分区的键/值时出错。如果需要,请寻找过去的记录以继续消费。