1

我在我的项目中使用NSQ作为mq,java生产者将消息生成到NSQ并去消费者消费它。但奇怪的是消费者总是在几秒钟后收到消息。只有几条消息,我真的不不知道怎么解释为什么会这样。下面是测试结果,请注意时间。它们都消耗相同的主题。你可以在第二次看到,Go 比 java 慢了 7s。Java结果:

信息 | 虚拟机 1 | 2018/07/11 17:22:01 | Msg receive:{"did":"XSQ000200000005","msg":{"id":"5560","type":1,"content":"ZBINh6CBsLw7k2xjr1wslSjY+5QavEgYU6AzzLZn0lOgON9ZYHnNP4UJVUGB+/SpsxZQnrWR9PlULzpSP/p9l9t8wiAwj8qhznRaT8jeyx1/EUrDE0oXJB8GxWaLJUICCbC92j4BMA2HU8vgcfDOp9nSy1KFafi9zgFiCf9Igqo="}}

信息 | 虚拟机 1 | 2018/07/11 17:22:11 | Msg receive:{"did":"XSQ000200000005","msg":{"id":"5560","type":1,"content":"ZBINh6CBsLw7k2xjr1wslSjY+5QavEgYU6AzzLZn0lOgON9ZYHnNP4UJVUGB+/SpsxZQnrWR9PlULzpSP/p9l9t8wiAwj8qhznRaT8jeyx1/EUrDE0oXJB8GxWaLJUICCbC92j4BMA2HU8vgcfDOp9nSy1KFafi9zgFiCf9Igqo="}}

信息 | 虚拟机 1 | 2018/07/11 17:23:21 | Msg receive:{"did":"XSQ000200000005","msg":{"id":"5560","type":1,"content":"ZBINh6CBsLw7k2xjr1wslSjY+5QavEgYU6AzzLZn0lOgON9ZYHnNP4UJVUGB+/SpsxZQnrWR9PlULzpSP/p9l9t8wiAwj8qhznRaT8jeyx1/EUrDE0oXJB8GxWaLJUICCbC92j4BMA2HU8vgcfDOp9nSy1KFafi9zgFiCf9Igqo="}}

信息 | 虚拟机 1 | 2018/07/11 17:25:31 | Msg receive:{"did":"XSQ000200000005","msg":{"id":"5560","type":1,"content":"ZBINh6CBsLw7k2xjr1wslSjY+5QavEgYU6AzzLZn0lOgON9ZYHnNP4UJVUGB+/SpsxZQnrWR9PlULzpSP/p9l9t8wiAwj8qhznRaT8jeyx1/EUrDE0oXJB8GxWaLJUICCbC92j4BMA2HU8vgcfDOp9nSy1KFafi9zgFiCf9Igqo="}}

去结果:

2018-07-11 17:22:03 broker.go DEBUG 准备将类型 1 的 msg 5560 发送到 XSQ000200000005

2018-07-11 17:22:28 broker.go DEBUG 准备将类型为 1 的 msg 5560 发送到 XSQ000200000005

2018-07-11 17:23:21 broker.go DEBUG 准备将类型 1 的 msg 5560 发送到 XSQ000200000005

2018-07-11 17:25:38 broker.go DEBUG 准备将类型 1 的 msg 5560 发送到 XSQ000200000005

请忽略其他错误,只是因为业务。

这是我的消费者:

func (b *Broker) createConsumer(topic string, vendor int32) error {
    config := nsq.NewConfig()
    laddr := "127.0.0.1"
    // so that the test can simulate binding consumer to specified address
    config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0")
    // so that the test can simulate reaching max requeues and a call to LogFailedMessage
    config.DefaultRequeueDelay = 0
    // so that the test wont timeout from backing off
    config.MaxBackoffDuration = time.Millisecond * 50




    c, err := nsq.NewConsumer(topic, "channel_box_" + util.String(vendor), config)
    if err != nil {
        return log.Error("Failed to new nsq consumers.")
    }

    c.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
        if err := b.handle(message, vendor); err != nil {
            log.Errorf("Handle message %v for vendor %d from mq failed.", message.ID, vendor)
        }
        return nil
    }), 5)
    if err = c.ConnectToNSQLookupds(b.Opts.Nsq.Lookup); err != nil {
        return log.Error("Failed to connect to nsq lookup server.")
    }
    b.consumers = append(b.consumers, c)

    return nil
}
4

0 回答 0