我目前从事微服务架构的工作。在我将 NATS 插入我的项目之前,我想用它测试一些简单的场景。
在一个场景中,我有一个简单的发布者,它在 localhost:4222 上运行的基本 Nats 服务器上的 for 循环中发布 100.000 条消息。
最大的问题是订阅者。当他收到 30.000 - 40.000 条消息时,我的整个 main.go 程序和所有其他 go 例程都会停止并且什么也不做。我可以用 ctrl + c 退出。但是发布者仍在继续发送消息。当我打开一个新终端并启动一个新的订阅者实例时,一切都会再次正常工作,直到订阅者收到大约 30000 条消息。最糟糕的是,服务器上什至没有出现一个错误,也没有日志,所以我不知道发生了什么。
之后,我尝试用 QueueSubscribe 方法替换订阅方法,一切正常。
订阅和队列订阅的主要区别是什么?
NATS-Streaming 是更好的机会吗?或者在哪些情况下我应该更喜欢 Streaming 以及标准 NATS-Server
这是我的代码:
出版商:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
func main() {
go createPublisher()
for {
}
}
func createPublisher() {
log.Println("pub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
msg := make([]byte, 16)
for i := 0; i < 100000; i++ {
nc.Publish("alenSub", msg)
if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond)
}
log.Println("pub finish")
nc.Flush()
}
订户:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
var received int64
func main() {
received = 0
go createSubscriber()
go check()
for {
}
}
func createSubscriber() {
log.Println("sub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
nc.Subscribe("alenSub", func(msg *nats.Msg) {
received++
})
nc.Flush()
for {
}
}
func check() {
for {
fmt.Println("-----------------------")
fmt.Println("still running")
fmt.Println("received", received)
fmt.Println("-----------------------")
time.Sleep(time.Second * 2)
}
}