4

我正在尝试用 Go 编写一个 RabbitMQ 消费者。假设一次从队列中取出 5 个对象并处理它们。此外,假设确认是否成功处理,否则发送到死信队列5次然后丢弃,它应该无限运行并处理消费者的取消事件。我有几个问题:

  1. RabbitMq-go Reference中是否有BasicConsumervs的概念?EventingBasicConsumer
  2. RabbitMQ 中有什么Model,RabbitMq-go 中有什么?
  3. 如何在死信队列失败时发送对象并在之后再次重新排队ttl
  4. 下面代码consumerTag中函数中参数的意义是什么ch.Consume
  5. 我们应该在这种情况下使用channel.Get()or吗?channel.Consume()

为了满足上述要求,我需要在以下代码中进行哪些更改。我问这个是因为我找不到 RabbitMq-Go 的像样的文档。

   func main() {

        consumer()        
    }

    func consumer() {

        objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}      
        initializeConn(&objConsumerConn.conn)


        ch, err := objConsumerConn.conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        msgs, err := ch.Consume(
                objConsumerConn.queueName, // queue
                "demo1",     // consumerTag
                false,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
            for d := range msgs {                  
                k := new(EventCaptureData)
                b := bytes.Buffer{}
                b.Write(d.Body)
                dec := gob.NewDecoder(&b)  
                err := dec.Decode(&k)
                d.Ack(true)  

                if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
                    fmt.Println(k)                        
            }
        }()      

        log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
        <-forever

    }

编辑问题:

我已经按照链接link1 link2中的建议延迟了消息的处理。但问题是,即使在 ttl 之后,消息也会从死信队列返回到其原始队列。我正在使用RabbitMQ 3.0.0. 谁能指出是什么问题?

4

1 回答 1

4

RabbitMq-go 参考中有 BasicConsumer 与 EventingBasicConsumer 的概念吗?

不完全是,但Channel.GetandChannel.Consume调用服务于类似的概念。有了Channel.Get一个非阻塞调用,如果有任何可用消息,它会获取第一条消息,或者返回ok=false。随着Channel.Consume排队的消息被传递到一个通道。

RabbitMQ 中的模型是什么,它是否存在于 RabbitMq-go 中?

如果您指的是 C# RabbitMQ 中的IModeland Connection.CreateModel,那是来自 C# lib 的东西,而不是来自 RabbitMQ 本身。这只是尝试从 RabbitMQ“通道”术语中抽象出来,但它从未流行起来。

如何在死信队列失败时发送对象并在 ttl 后再次将它们重新排队

delivery.Nack方法与requeue=false.

下面代码中 ch.Consume 函数中的 consumerTag 参数的意义是什么

ConsumerTag只是一个消费者标识符。它可用于通过channel.Cancel取消通道,并识别负责交付的消费者。使用 传递的所有消息channel.Consume都将ConsumerTag设置该字段。

我们应该在这种情况下使用channel.Get()or吗?channel.Consume()

我认为channel.Get()几乎永远不会比channel.Consume(). 您将channel.Get轮询队列并无所事事地浪费 CPU,这在 Go 中是没有意义的。

为了满足上述要求,我需要在以下代码中进行哪些更改。

  1. 由于您一次批处理 5 个,因此您可以拥有一个从消费者通道接收的 goroutine,一旦它获得 5 个交付,您就调用另一个函数来处理它们。

  2. 要确认或发送到死信队列,您将使用delivery.Ackdelivery.Nack函数。您可以multiple=true为批处理使用并调用一次。消息进入死信队列后,您必须检查delivery.Headers["x-death"]标头的死信次数并调用传递。已重试 5 次时拒绝。

  3. 使用channel.NotifyCancel处理取消事件。

于 2016-04-06T17:26:39.243 回答