2

我正在使用 NATS 迈出第一步,并看到我无法理解的行为,即使在非常仔细地阅读了文档之后也是如此。我有一个本地 NATS 服务器(2.6.5)正在运行。它开始于

./nats-server -js

我使用以下代码生成一些消息:

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.delete_stream(name="hello")
    await js.add_stream(
        name="hello",
        subjects=["hello"],
    )
    for i in range(0, 10):
        await js.publish("hello", f"hello world: {i}".encode())
    await nc.close()


if __name__ == "__main__":
    asyncio.run(main())

如果我运行代码并执行,./nats stream ls我会看到 10 条消息。到目前为止一切都很好。接下来我运行我的消费者:

async def main():
    nc = await nats.connect()
    js = nc.jetstream()
    sub = await js.pull_subscribe("hello", "hello")

    msg_count = 0
    while msg_count < 10:
        for msg in await sub.fetch(1):
            print("Received:", msg.data)
            msg_count = msg_count + 1

            # Try nack'ing every third message
            if msg_count % 3 == 0:
                await msg.nak()
            else:
                await msg.ack()

    await nc.close()


if __name__ == "__main__":
    asyncio.run(main())

输出显示:

Received: b'hello world: 0'
Received: b'hello world: 1'
Received: b'hello world: 2'
Received: b'hello world: 2'
Received: b'hello world: 3'
Received: b'hello world: 4'
Received: b'hello world: 4'
Received: b'hello world: 5'
Received: b'hello world: 6'
Received: b'hello world: 6'

这是有道理的:我拉了 10 条消息。每第三条消息都是“裸露的”,因此下一次调用会再次检索它。如果我再次启动脚本,输出是:

Received: b'hello world: 7'
Received: b'hello world: 8'
Received: b'hello world: 9'
Received: b'hello world: 9'

几秒钟后,我暂停了。显然 NATS 以某种方式记住了我的脚本并继续传递消息。但我不明白这是怎么发生的!?流中是否有“全局”光标?但在那种情况下,多个客户会干扰,这对我来说没有意义。所以我假设 NATS 以某种方式记住了我的客户。如何?我如何告诉 NATS 我要重新启动?我也很感激指向我显然错过的文档的指针!?

4

1 回答 1

1

在创建请求订阅时,jetstream 客户端 API 还会创建一个具有匹配消费者选项的持久消费者,在本例中是流名称和持久名称(第二个参数)。

sub = await js.pull_subscribe("hello", "hello")

持久的消费者旨在长期存在,服务器将跟踪消费者在流中的位置。因此,如果消费者停止然后重新启动,它将自动从停止的地方重新启动,并且用于初始化消费者的配置将被记住。进行拉式订阅时需要持久消费者,而进行推送订阅时是可选的。

来源:https ://nats.io/blog/jetstream-java-client-03-consume/#durable-vs-ephemeral

于 2021-12-06T15:17:32.967 回答