我正在使用 NATS 迈出第一步,并看到我无法理解的行为,即使在非常仔细地阅读了文档之后也是如此。我有一个本地 NATS 服务器(2.6.5)正在运行。它开始于
./nats-server -js
我使用以下代码生成一些消息:
async def main():
nc = await nats.connect()
js = nc.jetstream()
await js.delete_stream(name="hello")
await js.add_stream(
name="hello",
subjects=["hello"],
)
for i in range(0, 10):
await js.publish("hello", f"hello world: {i}".encode())
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
如果我运行代码并执行,./nats stream ls
我会看到 10 条消息。到目前为止一切都很好。接下来我运行我的消费者:
async def main():
nc = await nats.connect()
js = nc.jetstream()
sub = await js.pull_subscribe("hello", "hello")
msg_count = 0
while msg_count < 10:
for msg in await sub.fetch(1):
print("Received:", msg.data)
msg_count = msg_count + 1
# Try nack'ing every third message
if msg_count % 3 == 0:
await msg.nak()
else:
await msg.ack()
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
输出显示:
Received: b'hello world: 0'
Received: b'hello world: 1'
Received: b'hello world: 2'
Received: b'hello world: 2'
Received: b'hello world: 3'
Received: b'hello world: 4'
Received: b'hello world: 4'
Received: b'hello world: 5'
Received: b'hello world: 6'
Received: b'hello world: 6'
这是有道理的:我拉了 10 条消息。每第三条消息都是“裸露的”,因此下一次调用会再次检索它。如果我再次启动脚本,输出是:
Received: b'hello world: 7'
Received: b'hello world: 8'
Received: b'hello world: 9'
Received: b'hello world: 9'
几秒钟后,我暂停了。显然 NATS 以某种方式记住了我的脚本并继续传递消息。但我不明白这是怎么发生的!?流中是否有“全局”光标?但在那种情况下,多个客户会干扰,这对我来说没有意义。所以我假设 NATS 以某种方式记住了我的客户。如何?我如何告诉 NATS 我要重新启动?我也很感激指向我显然错过的文档的指针!?