8

我正在使用stradway 的 amqp库来连接 rabbitmq 服务器。该库提供了一个 channel.Consume() 函数,它返回一个“ <-chan Delivery ”。它还提供了一个 channel.Get() 函数,该函数返回一个“ Delivery ”等等。

我必须实现pop()功能,并且正在使用channel.Get()。但是,文档说:

"In almost all cases, using Channel.Consume will be preferred."

这里的首选是否意味着推荐?在 channel.Consume() 上使用 channel.Get() 有什么缺点吗?如果是,我如何使用 channel.Consume() 来实现 Pop() 函数?

4

2 回答 2

6

这真的取决于你想做什么。如果您只想从队列中获取一条消息(第一条),您可能应该使用basic.get,如果您打算处理来自队列的所有传入消息 -basic.consume这就是您想要的。

可能不是平台或库特定的问题,而是协议理解问题。

UPD

我不太熟悉 Go 语言,所以我将尝试向您简要介绍 AMQP 细节并描述用例。

有时您可能会遇到麻烦并产生开销basic.consume

有了basic.consume这样的工作流程:

  1. 发送basic.consume方法来通知代理您要接收消息
    • 虽然这是一种同步方法,但请等待basic.consume-ok来自代理的消息
  2. 开始收听basic.deliver来自服务器的消息
    • 这是一种异步方法,您应该自己处理服务器上没有可用消息的情况,例如限制阅读时间

有了basic.get这样的工作流程:

  1. basic.get向代理 发送同步方法
    • 等待basic.get-ok方法,它保存消息或basic.empty方法,表示服务器上没有可用消息的情况

同步和异步方法的注意事项:同步应该有一些响应,异步是否没有

关于basic.qos方法prefetch-count属性的注意事项:no-ack当属性设置为basic.consume或时,它会被忽略basic.get

Spec 有一个注释basic.get:“此方法使用同步对话提供对队列中消息的直接访问,该对话是为同步功能比性能更重要的特定类型的应用程序设计的”,适用于连续消息消费。

我的个人测试表明,在 RabbitMQ 3.0.1、Erlang R14B04 上,使用basic.get(0.38659715652466) 获取 1000 条消息比逐条获取 1000 条消息 (0.47398710250854) 更快basic.consume,平均速度超过 15%。

如果您只在主线程中使用一条消息 - 可能您必须使用basic.get.

您仍然可以仅异步使用一条消息,例如在单独的线程中或使用某种事件机制。有时这对您的机器资源来说会是更好的解决方案,但您必须注意队列中没有可用消息的情况。

如果您必须一一处理消息,很明显basic.consume应该使用,我认为

于 2013-06-25T07:30:35.417 回答
6

据我从文档中可以看出,是的,“首选”确实意味着“推荐”。

似乎它channel.Get()没有提供尽可能多的功能channel.Consume(),并且由于它返回 a chanof而更容易在并发代码中使用Delivery,而不是单独的每个人Delivery

提到的额外功能是exclusive,noLocalnoWait, 以及Table“对队列或服务器具有特定语义”的可选参数。

要使用你可以实现一个Pop()函数channel.Consume(),链接到来自amqp 示例消费者的一些代码片段,使用该函数创建一个通道Consume(),创建一个函数来处理chanDelivery实际实现你的功能Pop()的函数,然后在 a 中触发handle()函数goroutine

关键是通道(在链接的示例中)如果没有收到任何内容,则会阻止发送。在示例中,handle()func 用于range处理整个通道,直到它为空。一个函数可能会更好地为您Pop()的功能提供服务,该函数只接收来自 的最后一个值chan并返回它。每次运行它都会返回最新的Delivery.

编辑:从通道接收最新值并对其进行处理的示例函数(这可能不适用于您的用例,如果该函数将另一个发送到另一个要处理的函数可能会更有用Deliverychan另外,我没有'没有测试下面的代码,它可能充满错误)

func handle(deliveries <-chan amqp.Delivery, done chan error) {
    select {
    case d = <-deliveries:
        // Do stuff with the delivery
        // Send any errors down the done chan. for example:
        // done <- err
    default:
        done <- nil
    }
}
于 2013-06-18T09:06:36.920 回答