1

Apache Pulsar 具有官方文档的消息保留和到期主题下记录的 TTL 功能。但是,我无法确定在配置中的哪个位置设置了此检查的频率。使用标准bin/pulsar standalone命令,自定义命名空间,ttl 配置为 5 seconds bin/pulsar-admin namespaces set-message-ttl public/ttl-test --messageTTL 5

我可以看到消息仅在设置的时间间隔后过期,并且以下日志消息打印到控制台:

15:11:59.337 [pulsar-msg-expiry-monitor-52-1] 信息 org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [persistent://public/ttl-test/my-topic] [spark -shell] 开始消息过期检查,ttl= 5 秒

我的问题的关键是:如何提高检查消息是否超过 TTL 的速率?

4

2 回答 2

3

代理中的配置messageExpiryCheckIntervalInMinutes决定了检查命名空间主题是否有过期消息的频率。

根据配置的官方文档

于 2019-03-20T15:38:57.483 回答
0

使用 set-message-ttl 命令并指定命名空间名称(默认为 public/default for persistent topic)和时间。

bin/pulsar-admin namespaces set-message-ttl public/default --messageTTL 120

生产者和消费者实现ttl的示例代码(python客户端)

import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic-reader1')

producer.send(('Hello-Pulsar1').encode('utf-8'))
producer.send(('Hello-Pulsar2').encode('utf-8'))
producer.send(('Hello-Pulsar3').encode('utf-8'))

producer.close()
client.close()

您可以使用发送方法发送多条消息。两个类中的主题名称应相同。

import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe("my-topic-reader1", "my-subscription")

//receive all the messages.whatever we publish
msg = consumer.receive()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

//Here we are not acknowledge all the messages.

//close the consumer and client
consumer.close()
client.close()

在 120 秒内,我们再次打开客户端和消费者并尝试读取相同的消息,这不是发布。我们再次关闭客户端和消费者。

稍后(120 秒后),我们将再次打开客户端和消费者,然后尝试接收消息。但它不应该来。在这种情况下,你正在实现生存时间。

于 2019-05-10T08:52:55.490 回答