1

我有一个 Pulsar 阅读器,可以读取消息并处理它们。当消息到达某个主题时,读者会收到它。但是,如果这个阅读器应用程序离线一段时间并重新连接到脉冲星集群,阅读器不会读取在阅读器离线期间写入脉冲星的那些消息。但是,当阅读器重新启动时,它不会读取已经使用的旧消息。读者被设置为startMessageId(MessageId.latest).

这是读者的预期行为还是我错过了什么?下面是我的简单代码供参考。

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarServiceUrl).build();
Reader<String> reader = pulsarClient.newReader(Schema.STRING).readerName("reader-1").startMessageId(MessageId.latest)
                .topic("persistent://public/default/topic-1").create();
while(true) {
  Message<String> data = reader.readNext(); //blocks if there are no new messages
  System.out.println(data.getValue());
}
4

1 回答 1

2

使用Reader界面,您必须跟踪从主题中读取的消息 ID。因为您正在使用MessageId.latest,所以每次客户端连接时(例如,重新启动后),它都会将自己定位为主题的结尾。如果您想从上次中断的地方继续,那么您需要跟踪应用程序收到的最后一条消息 ID 并将其用作startMessageId.

如果您不想跟踪消息 ID,可以使用Consumer接口让 Pulsar 为您执行此操作。使用消费者,您可以指定一个订阅名称,以跟踪您在主题中的位置。因此,如果客户端在使用Consumer接口时重新启动,您将从上次中断的地方继续。有关更多信息,请参阅此页面

于 2020-01-10T12:25:54.570 回答