2

我尝试在exchange_declare回调中使用交换名称。我将SelectConnection适配器用于 Pika 中的异步请求。

我的想法是首先在我的程序中构建交换、队列和绑定列表。然后,我向 Pika 发送多个交换和队列声明,并在每个绑定所需的队列和交换收到它们的 DeclareOK 消息后立即创建绑定。

这意味着我需要知道回调中的交换和队列的名称,以便我可以将它们与要创建的绑定相关联。

像这样的东西:

print "Create A"
channel.exchange_declare(callback=on_exchange, exchange="exchangeA")
print "Create B"
channel.exchange_declare(callback=on_exchange, exchange="exchangeB")

def on_exchange(response):
  # How do I know if this was exchangeA or exchangeB ?
  print "Exchange declared"
  print response

这给出了以下输出,清楚地表明首先声明了交换,然后触发了回调(如预期的那样):

Create A
Create B
Exchange declared
<METHOD(['method=<Exchange.DeclareOk>', 'channel_number=1', 'frame_type=1'])>
Exchange declared
<METHOD(['method=<Exchange.DeclareOk>', 'channel_number=1', 'frame_type=1'])>

我发现它responsepika.frame.Method类型,但我该如何从那里开始呢?交换是否可以保证按顺序声明?

4

3 回答 3

3

闭包是一种保持代码异步的解决方案:

def declare_exchanges(self):
    exchange = "exchangeA"
    callback = self.on_exchange(exchange)
    channel.exchange_declare(callback=callback, exchange=exchange)

    exchange = "exchangeB"
    callback = self.on_exchange(exchange)
    channel.exchange_declare(callback=callback, exchange=exchange)

def on_exchange(self, exchange):
    def callback(response):
        print(exchange)

    return callback

PS:我在上游打开了一个问题,https://github.com/pika/pika/issues/898

于 2017-12-05T10:58:54.680 回答
1

您可以相应地链接您的回调来控制设置过程。只需逐步建立您的交换和队列。这更冗长,但您可以控制。

class ChatServer(object):
    def __init__(self):
        self.channel_in = None
        self.channel_out = None
        cred = pika.PlainCredentials('guest','guest')
        param = pika.ConnectionParameters(host='localhost',
                                          port=5672,
                                          virtual_host='/',
                                          credentials=cred)
        self.connection = pika.SelectConnection(param,self.on_connected)


    def on_connected(self, connection):
        self.connection.channel(self.on_channel_out_open)

    def on_channel_out_open(self,channel):
        self.channel_out = channel
        self.channel_out.exchange_declare(exchange='chatserver_out',
                                          type='direct',
                                          auto_delete=False,
                                          callback=self.on_exchange_declare_out)
    def on_exchange_declare_out(self,method_frame):
        self.channel_out.exchange_declare(exchange='cmd',
                                          type='direct',
                                          auto_delete=False,
                                          callback=self.on_exchange_declare_cmd_out)
    def on_exchange_declare_cmd_out(self,method_frame):
        # now all exchanges are defined.
        # Let's create the queues.
        # ...
于 2013-10-23T07:07:45.330 回答
1

exchange_declare在回调中读取交换名称似乎是不可能的。相反,您需要做的是保留一个计数器,以便您知道何时创建了所有交换:

class ...:
  def __init__(self):
    self.exchangestocreate = 2

  #...

  def declare_exchanges(self):
    channel.exchange_declare(callback=self.on_exchange, exchange="exchangeA")
    channel.exchange_declare(callback=self.on_exchange, exchange="exchangeB")

  def on_exchange(self, response):
    self.exchangestocreate -= 1
    if self.exhangestocreate == 0:
      # Declare bindings here

这将有效地使接口在交换声明步骤中同步。

于 2013-11-06T06:06:41.553 回答