3

我目前从事微服务架构的工作。在我将 NATS 插入我的项目之前,我想用它测试一些简单的场景。

在一个场景中,我有一个简单的发布者,它在 localhost:4222 上运行的基本 Nats 服务器上的 for 循环中发布 100.000 条消息。

最大的问题是订阅者。当他收到 30.000 - 40.000 条消息时,我的整个 main.go 程序和所有其他 go 例程都会停止并且什么也不做。我可以用 ctrl + c 退出。但是发布者仍在继续发送消息。当我打开一个新终端并启动一个新的订阅者实例时,一切都会再次正常工作,直到订阅者收到大约 30000 条消息。最糟糕的是,服务器上什至没有出现一个错误,也没有日志,所以我不知道发生了什么。

之后,我尝试用 QueueSubscribe 方法替换订阅方法,一切正常。

订阅和队列订阅的主要区别是什么?

NATS-Streaming 是更好的机会吗?或者在哪些情况下我应该更喜欢 Streaming 以及标准 NATS-Server

这是我的代码:

出版商:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/go-nats"
)

func main() {
    go createPublisher()

    for {

    }
}

func createPublisher() {

    log.Println("pub started")

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    msg := make([]byte, 16)

    for i := 0; i < 100000; i++ {
        nc.Publish("alenSub", msg)
        if (i % 100) == 0 {
            fmt.Println("i", i)
        }
        time.Sleep(time.Millisecond)
    }

    log.Println("pub finish")

    nc.Flush()

}

订户:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/go-nats"
)

var received int64

func main() {
    received = 0

    go createSubscriber()
    go check()

    for {

    }
}

func createSubscriber() {

    log.Println("sub started")

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.Subscribe("alenSub", func(msg *nats.Msg) {
        received++
    })
    nc.Flush()

    for {

    }
}

func check() {
    for {
        fmt.Println("-----------------------")
        fmt.Println("still running")
        fmt.Println("received", received)
        fmt.Println("-----------------------")
        time.Sleep(time.Second * 2)
    }
}
4

2 回答 2

1

如前所述,删除 for{} 循环。替换为 runtime.Goexit()。

对于订阅者,您不需要在 Go 例程中创建订阅者。异步订阅者已经有自己的回调函数。

还使用原子或互斥锁保护接收到的变量。

请参阅此处的示例。

https://github.com/nats-io/go-nats/tree/master/examples

于 2018-10-02T05:35:37.043 回答
1

无限for循环可能会使垃圾收集器挨饿:https ://github.com/golang/go/issues/15442#issuecomment-214965471

我只需运行发布者即可重现该问题。要解决此问题,我建议使用sync.WaitGroup. 以下是我如何更新评论中链接的代码以完成它:

package main

import (
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/nats-io/go-nats"
)

// create wait group
var wg sync.WaitGroup

func main() {
    // add 1 waiter
    wg.Add(1)
    go createPublisher()

    // wait for wait group to complete
    wg.Wait()
}

func createPublisher() {

    log.Println("pub started")
    // mark wait group done after createPublisher completes
    defer wg.Done()

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    msg := make([]byte, 16)

    for i := 0; i < 100000; i++ {
        if errPub := nc.Publish("alenSub", msg); errPub != nil {
            panic(errPub)
        }

        if (i % 100) == 0 {
            fmt.Println("i", i)
        }
        time.Sleep(time.Millisecond * 1)
    }

    log.Println("pub finish")

    errFlush := nc.Flush()
    if errFlush != nil {
        panic(errFlush)
    }

    errLast := nc.LastError()
    if errLast != nil {
        panic(errLast)
    }

}

我建议类似地更新上述订阅者代码。

Subscribe和之间的主要区别在于QueueSubscriber,在Subscribe所有订阅者中都发送了所有消息。而在QueueSubscribea 中只有一个订阅者QueueGroup会发送每条消息。

有关 NATS 流媒体附加功能的一些详细信息,请参见: https ://nats.io/documentation/streaming/nats-streaming-intro/

我们看到从数据管道到控制平面的各种用例中都使用了 NATS 和 NATS Streaming。您的选择应该由您的用例需求驱动。

于 2017-08-28T14:40:40.733 回答