1

尝试学习 nsq,并遵循golang 示例nsqjs中的示例。我正在服务器端发送消息,使用 for 循环和执行例程

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
    wg.Add(1)
    go func(x int) {
        defer wg.Done()

        chanName := fmt.Sprintf("import_progress_587e6442ff74889098498f6e")
        m := map[string]interface{}{
            "body": map[string]interface{}{
                "progress": x,
            },
        }
        msg, _ := json.Marshal(m)

        req := NSQPubReq{
            Topic: chanName,
            Body:  msg,
        }
        if err := producer.Publish(req.Topic, req.Body); err != nil {
        }
        utils.Info(fmt.Sprintf("sent msg=%v", string(msg)))

    }(i)
}

wg.Wait()

但问题是,在客户端。

// channel = 'import_progress_587e6442ff74889098498f6e'
let reader = new nsq.Reader(channel, channel, {
    //lookupdHTTPAddresses: '<<IP>>:4161',
    maxInFlight: 10000,
    snappy: true
})
reader.connect()

reader.on('message', (msg) => {
    var msgData = {
            id:     msg.id,
            body:   msg.body.toString(),
            chan:   channel
    }
    io.emit(channel, msgData)
    msg.finish()
})

消息不会立即发送给客户端。我将等待几秒钟,直到消息到达 nodejs 客户端。有什么我需要做的设置吗?谢谢你!

4

1 回答 1

4

nsqjs 客户端接收刚刚发布的消息会很慢有几个原因:

  1. 如果主题是新的并且主题的发现是 via nsqlookupd,那么默认情况下,nsqjs 阅读器将尝试每 30 秒发现新主题。

    从上面的示例中,您似乎正在为每次导入创建新主题。我相信如果你先从 Golang 客户端开始发布消息,然后再启动 nsqjs 客户端,那么你应该不会看到延迟。

  2. 如果您有多个 nsqds 的max-in-flight设置太低,那么它会将 nsqjsReader置于饥饿模式,在该模式下,它将RDY在 nsqds 之间移动计数一段时间。

    我不确定这就是这里发生的事情,因为我无法说出有关 nsq 拓扑的任何信息。只要您的 max-in-flight 设置高于nsqd您拥有的实例数,那么您的状态就会很好。

于 2017-01-19T14:02:42.953 回答