3

我有一个监听消息的消费者,如果消息流超过消费者可以处理的范围,我想启动这个消费者的另一个实例。

但我也希望能够从消费者那里轮询信息,我的想法是我可以使用 RPC 通过使用扇出交换从生产者那里请求这些信息,这样所有生产者都会得到 RPC 调用。

我的问题首先是这是否可能,其次是否合理?

4

2 回答 2

4

如果问题是“是否可以向多个服务器发送 RPC 消息?” 答案是肯定的。

当您构建 RPC 调用时,您将一个临时队列附加到消息(通常在 header.reply_to 中,但您也可以使用内部消息字段)。这是 RPC 目标将发布其答案的队列。

当您将 RPC 发送到单个服务器时,您可以在临时队列中接收多个消息:这意味着 RPC 应答可以由以下方式形成:

  • 来自单一来源的单一消息
  • 来自单一来源的多条消息
  • 来自多个来源的不止一条消息

在这种情况下出现的问题是

  • 你什么时候停止听?如果您知道 RPC 服务器的数量,您可以等到每个服务器都向您发送答案,否则您必须实现某种形式的超时
  • 你需要追踪答案的来源吗?您可以在消息中添加一些特殊字段以保留此信息。消息顺序也是如此。

只是一些代码来展示如何做到这一点(带有 Pika 库的 Python)。注意,这远非完美:最大的问题是当你得到一个新的答案时你应该重置超时。

    def consume_rpc(self, queue, result_len=1, callback=None, timeout=None, raise_timeout=False):
        if timeout is None:
            timeout = self.rpc_timeout

        result_list = []

        def _callback(channel, method, header, body):
            print "### Got 1/%s RPC result" %(result_len)
            msg = self.encoder.decode(body)
            result_dict = {}
            result_dict.update(msg['content']['data'])
            result_list.append(result_dict)

            if callback is not None:
                callback(msg)

            if len(result_list) == result_len:
                print "### All results are here: stopping RPC"
                channel.stop_consuming()

        def _outoftime():
            self.channel.stop_consuming()
            raise TimeoutError

        if timeout != -1:
            print "### Setting timeout %s seconds" %(timeout)
            self.conn_broker.add_timeout(timeout, _outoftime)

        self.channel.basic_consume(_callback, queue=queue, consumer_tag=queue)

        if raise_timeout is True:
            print "### Start consuming RPC with raise_timeout"
            self.channel.start_consuming()
        else:
            try:
                print "### Start consuming RPC without raise_timeout"
                self.channel.start_consuming()
            except TimeoutError:
                pass

        return result_list
于 2012-10-03T09:08:12.780 回答
1

经过一番研究,这似乎是不可能的。如果您查看 RabbitMQ.com 上的教程,您会发现调用有一个 id,据我所知,它会被消耗掉。

我选择了另一种方式,即读取日志文件并聚合数据。

于 2012-09-18T13:32:41.277 回答