我正在尝试用 Go 编写一个 RabbitMQ 消费者。假设一次从队列中取出 5 个对象并处理它们。此外,假设确认是否成功处理,否则发送到死信队列5次然后丢弃,它应该无限运行并处理消费者的取消事件。我有几个问题:
- RabbitMq-go Reference中是否有
BasicConsumer
vs的概念?EventingBasicConsumer
- RabbitMQ 中有什么
Model
,RabbitMq-go 中有什么? - 如何在死信队列失败时发送对象并在之后再次重新排队
ttl
- 下面代码
consumerTag
中函数中参数的意义是什么ch.Consume
- 我们应该在这种情况下使用
channel.Get()
or吗?channel.Consume()
为了满足上述要求,我需要在以下代码中进行哪些更改。我问这个是因为我找不到 RabbitMq-Go 的像样的文档。
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
编辑问题:
我已经按照链接link1 link2中的建议延迟了消息的处理。但问题是,即使在 ttl 之后,消息也会从死信队列返回到其原始队列。我正在使用RabbitMQ 3.0.0
. 谁能指出是什么问题?