我在我的项目中使用NSQ作为mq,java生产者将消息生成到NSQ并去消费者消费它。但奇怪的是消费者总是在几秒钟后收到消息。只有几条消息,我真的不不知道怎么解释为什么会这样。下面是测试结果,请注意时间。它们都消耗相同的主题。你可以在第二次看到,Go 比 java 慢了 7s。Java结果:
信息 | 虚拟机 1 | 2018/07/11 17:22:01 | Msg receive:{"did":"XSQ000200000005","msg":{"id":"5560","type":1,"content":"ZBINh6CBsLw7k2xjr1wslSjY+5QavEgYU6AzzLZn0lOgON9ZYHnNP4UJVUGB+/SpsxZQnrWR9PlULzpSP/p9l9t8wiAwj8qhznRaT8jeyx1/EUrDE0oXJB8GxWaLJUICCbC92j4BMA2HU8vgcfDOp9nSy1KFafi9zgFiCf9Igqo="}}
信息 | 虚拟机 1 | 2018/07/11 17:22:11 | Msg receive:{"did":"XSQ000200000005","msg":{"id":"5560","type":1,"content":"ZBINh6CBsLw7k2xjr1wslSjY+5QavEgYU6AzzLZn0lOgON9ZYHnNP4UJVUGB+/SpsxZQnrWR9PlULzpSP/p9l9t8wiAwj8qhznRaT8jeyx1/EUrDE0oXJB8GxWaLJUICCbC92j4BMA2HU8vgcfDOp9nSy1KFafi9zgFiCf9Igqo="}}
信息 | 虚拟机 1 | 2018/07/11 17:23:21 | Msg receive:{"did":"XSQ000200000005","msg":{"id":"5560","type":1,"content":"ZBINh6CBsLw7k2xjr1wslSjY+5QavEgYU6AzzLZn0lOgON9ZYHnNP4UJVUGB+/SpsxZQnrWR9PlULzpSP/p9l9t8wiAwj8qhznRaT8jeyx1/EUrDE0oXJB8GxWaLJUICCbC92j4BMA2HU8vgcfDOp9nSy1KFafi9zgFiCf9Igqo="}}
信息 | 虚拟机 1 | 2018/07/11 17:25:31 | Msg receive:{"did":"XSQ000200000005","msg":{"id":"5560","type":1,"content":"ZBINh6CBsLw7k2xjr1wslSjY+5QavEgYU6AzzLZn0lOgON9ZYHnNP4UJVUGB+/SpsxZQnrWR9PlULzpSP/p9l9t8wiAwj8qhznRaT8jeyx1/EUrDE0oXJB8GxWaLJUICCbC92j4BMA2HU8vgcfDOp9nSy1KFafi9zgFiCf9Igqo="}}
去结果:
2018-07-11 17:22:03 broker.go DEBUG 准备将类型 1 的 msg 5560 发送到 XSQ000200000005
2018-07-11 17:22:28 broker.go DEBUG 准备将类型为 1 的 msg 5560 发送到 XSQ000200000005
2018-07-11 17:23:21 broker.go DEBUG 准备将类型 1 的 msg 5560 发送到 XSQ000200000005
2018-07-11 17:25:38 broker.go DEBUG 准备将类型 1 的 msg 5560 发送到 XSQ000200000005
请忽略其他错误,只是因为业务。
这是我的消费者:
func (b *Broker) createConsumer(topic string, vendor int32) error {
config := nsq.NewConfig()
laddr := "127.0.0.1"
// so that the test can simulate binding consumer to specified address
config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0")
// so that the test can simulate reaching max requeues and a call to LogFailedMessage
config.DefaultRequeueDelay = 0
// so that the test wont timeout from backing off
config.MaxBackoffDuration = time.Millisecond * 50
c, err := nsq.NewConsumer(topic, "channel_box_" + util.String(vendor), config)
if err != nil {
return log.Error("Failed to new nsq consumers.")
}
c.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
if err := b.handle(message, vendor); err != nil {
log.Errorf("Handle message %v for vendor %d from mq failed.", message.ID, vendor)
}
return nil
}), 5)
if err = c.ConnectToNSQLookupds(b.Opts.Nsq.Lookup); err != nil {
return log.Error("Failed to connect to nsq lookup server.")
}
b.consumers = append(b.consumers, c)
return nil
}