我有一个 Pulsar 阅读器,可以读取消息并处理它们。当消息到达某个主题时,读者会收到它。但是,如果这个阅读器应用程序离线一段时间并重新连接到脉冲星集群,阅读器不会读取在阅读器离线期间写入脉冲星的那些消息。但是,当阅读器重新启动时,它不会读取已经使用的旧消息。读者被设置为startMessageId(MessageId.latest)
.
这是读者的预期行为还是我错过了什么?下面是我的简单代码供参考。
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarServiceUrl).build();
Reader<String> reader = pulsarClient.newReader(Schema.STRING).readerName("reader-1").startMessageId(MessageId.latest)
.topic("persistent://public/default/topic-1").create();
while(true) {
Message<String> data = reader.readNext(); //blocks if there are no new messages
System.out.println(data.getValue());
}