我是 Apache Pulsar 的新手(也是 MQ 系统的新手)。现在,我有一个关于 Pulsar Reader 的问题。
问题描述:
我启动一个 Pulsar 实例,然后启动一个消费者,监听一个主题 A。然后我启动一个生产者,向主题 A 发送 100 条消息,消费者消费了这 100 条消息,消费者订阅中的 Backlog 值为 0 .该主题只有一个订阅,并且是独家的。
之后,我启动了一个 Reader,设置 Reader 的主题为 A,Reader 可以从主题 A 获取消息。
我在 Pulsar 文档上找到了这个: https ://pulsar.apache.org/docs/en/cookbooks-retention-expiry/
Pulsar 代理负责处理通过 Pulsar 传递的消息,包括消息的持久存储。默认情况下,经纪人:
立即删除每个订阅上已确认的所有消息,
并将所有未确认的消息永久存储在积压中。
这 100 条消息应该已经被删除了。那么为什么 Pulsar Reader 仍然可以从主题 A 中获取消息呢?
我的代码:
消费者:
private static void consume() {
try {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("A")
.subscriptionName("first-subscription")
.subscribe();
for (int i = 0; true; ++i) {
try {
Message<String> msg = consumer.receive();
String m = msg.getValue();
System.out.println("\t m:" + m);
consumer.acknowledge(msg);
Thread.sleep(500);
} catch (Exception e) {
LOGGER.error("", e);
}
}
} catch (Exception e) {
LOGGER.error("", e);
}
}
生产商:
private static void produce() {
try {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic("A").create();
for (int i = 0; i < 100; ++i) {
producer.send("producer-simple-partitioned-" + i);
}
} catch (Exception e) {
LOGGER.error("", e);
}
}
读者:
private static void readerRead() {
try {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
Reader<byte[]> reader = pulsarClient.newReader()
.topic("A")
.startMessageId(MessageId.earliest)
.create();
while (true) {
Message message = reader.readNext();
System.out.println(new String(message.getData()));
}
} catch (PulsarClientException e) {
LOGGER.error("", e);
}
}