关于这个主题有很多问题,但是,这不是重复的问题!
我面临的问题是我尝试使用 Java 14 和 Kafka 2.5.0 设置 SpringBoot 项目,而我的Consumer 返回一个空的记录列表。这里的大多数答案表明一些被遗忘的属性,经常轮询或将偏移模式设置为 early。
我看不出与docs.confluent.io有任何逻辑差异,尽管我的配置设置看起来不合常规(请参阅下面片段中的jaas.conf设置)。
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
return new KafkaConsumer<>(config);
}
}
然而,这有效。我没有收到任何异常(Kafka 或其他),并且建立了连接。
// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};
这是我实际投票的地方:
try {
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
int count = 0;
Long start = System.currentTimeMillis();
Long end = System.currentTimeMillis();
while (end - start < 900_000) {
// boolean would be set to true in production
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
System.out.println("visualize number of loops made: " + ++count);
end = System.currentTimeMillis();
}
} catch (KafkaException e) {
e.printStackTrace();
} catch (Exception e) {
System.out.println(e.getMessage());
}
我添加了打印件和其他杂物以尝试找出问题。我在调试模式下运行我的程序并将断点放在这一行:
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
结果,正如人们所期望的那样,我看到了每秒计数的打印行。但由于我的消费者没有返回任何记录,它永远不会进入forEach
,因此永远不会触发我的断点。
我绝对可以在云中看到我的主题,有两个分区。消息源源不断地生成,所以我知道我应该能够收到一些东西。
我知道连接到集群需要一些时间,但是当前时间设置为一刻钟,我至少应该收到一些东西,对吧?作为替代方案,我尝试切换consumer.subscribe()
到consumer.assign()
指定我的 TopicPartition 的方法,将使用者设置为consumer.seekToBeginning()
. 它运行良好,但也没有返回任何内容。
在最常见的示例中没有发现的另一件事是我使用自己的类。因此KafkaConsumer<String, String>
,我根据本教程实现了自定义(反)序列化程序,而不是。
会不会是我的配置设置?轮询超时有问题吗?(反)序列化,还是完全其他的?我真的无法确定为什么我得到零记录的任何原因。任何反馈将不胜感激!