0

我是 Apache Pulsar 的新手(也是 MQ 系统的新手)。现在,我有一个关于 Pulsar Reader 的问题。

问题描述:
我启动一个 Pulsar 实例,然后启动一个消费者,监听一个主题 A。然后我启动一个生产者,向主题 A 发送 100 条消息,消费者消费了这 100 条消息,消费者订阅中的 Backlog 值为 0 .该主题只有一个订阅,并且是独家的。

之后,我启动了一个 Reader,设置 Reader 的主题为 A,Reader 可以从主题 A 获取消息。

我在 Pulsar 文档上找到了这个: https ://pulsar.apache.org/docs/en/cookbooks-retention-expiry/

Pulsar 代理负责处理通过 Pulsar 传递的消息,包括消息的持久存储。默认情况下,经纪人:

立即删除每个订阅上已确认的所有消息,
并将所有未确认的消息永久存储在积压中。

这 100 条消息应该已经被删除了。那么为什么 Pulsar Reader 仍然可以从主题 A 中获取消息呢?

我的代码:

消费者:

private static void consume() {
        try {
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                    .topic("A")
                    .subscriptionName("first-subscription")
                    .subscribe();

            for (int i = 0; true; ++i) {
                try {
                    Message<String> msg = consumer.receive();
                    String m = msg.getValue();
                    System.out.println("\t m:" + m);
                    consumer.acknowledge(msg);
                    Thread.sleep(500);
                } catch (Exception e) {
                    LOGGER.error("", e);
                }
            }
        } catch (Exception e) {
            LOGGER.error("", e);
        }
    }

生产商:

private static void produce() {
        try {
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
            Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic("A").create();
            for (int i = 0; i < 100; ++i) {
                producer.send("producer-simple-partitioned-" + i);
            }
        } catch (Exception e) {
            LOGGER.error("", e);
        }
    }

读者:

private static void readerRead() {
        try {
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
            Reader<byte[]> reader = pulsarClient.newReader()
                    .topic("A")
                    .startMessageId(MessageId.earliest)
                    .create();
            while (true) {
                Message message = reader.readNext();
                System.out.println(new String(message.getData()));
            }
        } catch (PulsarClientException e) {
            LOGGER.error("", e);
        }
    }
4

1 回答 1

1

您在阅读器界面中遇到的行为似乎是一种可能的情况。在这种情况下,文档是不正确的,因为消息在确认后没有被明确删除。如果包含它们的分类帐仍然存在,则可以阅读这些消息。

实际上,这部分文档将在未来的版本中进行更改

请注意,不再存储的消息不一定会立即删除,实际上可能在下一次分类帐翻转之前仍然可以访问。由于客户无法预测何时可能发生翻转,因此依赖在不方便的时间点不会发生翻转是不明智的。

默认情况下,当 Pulsar 消息到达代理时,它将被存储,直到它在所有订阅上得到确认,此时它将被标记为删除。

于 2019-12-04T18:24:13.000 回答