0

我们使用 RabbitMQ 将作业从一台机器上的生产者发送到分布在多台机器上的一小群消费者。

生产者生成作业并将它们放入队列中,消费者每 10 毫秒检查一次队列以查看是否有无人认领的作业,如果有作业则一次获取作业。如果一个特定的工作人员处理一项工作的时间太长(GC 暂停或其他暂时性问题),其他消费者可以自由地从队列中删除工作,以便整体工作吞吐量保持较高。

当我们最初建立这个系统时,我们无法弄清楚如何为队列中的多个消费者建立订阅关系,这将防止我们不得不轮询并引入一点额外的延迟。

检查文档并没有得到令人满意的答案。我们是使用消息队列的新手,我们可能不知道准确描述上述场景的词语。这有点像黑板系统,但在这种情况下,“专家”都是相同的,从不消耗彼此的结果——结果总是报告给工作制作人。

有任何想法吗?

4

2 回答 2

0

获得 pub-subscribe 是直截了当的,我最初遇到了同样的问题,但效果很好。该项目现在在http://www.rabbitmq.com/getstarted.html上有一些很好的帮助页面

RabbitMQ 有超时和 resernt 标志,可以根据需要使用。

您还可以让工作人员进行事件驱动,以便每 10 毫秒检查一次等。如果您需要这方面的帮助,我在http://rabbitears.codeplex.com/有一个小项目,可能会有所帮助。

于 2011-01-16T12:18:47.957 回答
0

在这里你必须记住,rabbitMQ 通道不是线程安全的。所以创建一个单例类来处理所有这些rabbitmq操作

喜欢

我正在 SCALA 中编写代码示例

    Object QueueManager{

      val FACTORY = new ConnectionFactory
      FACTORY setUsername (RABBITMQ_USERNAME)
      FACTORY setPassword (RABBITMQ_PASSWORD)
      FACTORY setVirtualHost (RABBITMQ_VIRTUALHOST)
      FACTORY setPort (RABBITMQ_PORT)
      FACTORY setHost (RABBITMQ_HOST)

    conn = FACTORY.newConnection
      var channel: com.rabbitmq.client.Channel =  conn.createChannel

    //here to decare consumer  for queue1
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", durable)
      channel.queueDeclare(QUEUE1, durable, false, false, null)
      channel queueBind (QUEUE1, EXCHANGE_NAME, QUEUE1_ROUTING_KEY)
      val queue1Consumer = new QueueingConsumer(channel)
      channel basicConsume (QUEUE1, false, queue1Consumer)

    //here to decare consumer  for queue2
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", durable)
      channel.queueDeclare(QUEUE2, durable, false, false, null)
      channel queueBind (QUEUE2, EXCHANGE_NAME, QUEUE2_ROUTING_KEY)
      val queue2Consumer = new QueueingConsumer(channel)
      channel basicConsume (QUEUE2, false, queue2Consumer)





    //here u should mantion distinct ROUTING key for each queue
       def addToQueueOne{
    channel.basicPublish(EXCHANGE_NAME, QUEUE1_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, obj.getBytes)
    }

   def addToQueueTwo{
channel.basicPublish(EXCHANGE_NAME, QUEUE2_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, obj.getBytes)
}

def getFromQueue1:Delivery={
 queue1Consumer.nextDelivery
}

def getFromQueue2:Delivery={
  queue2Consumer.nextDelivery
}

}

我已经为 2 个队列编写了一个代码示例,你可以像上面一样添加更多队列............

于 2010-12-31T05:35:00.640 回答