我将 RabbitMQ 3.5.1 与rabbit_presence_exchange(二进制分发)和rabbitmq_event_exchange(帮助调试此问题)插件和 Python Pika 客户端一起使用。
Presence 插件通过给你一个新的交换类型来工作:x-presence。使用路由键将队列绑定到此队列会在队列绑定和未绑定时生成存在通知(例如,路由键是用户名)。在没有路由键的情况下绑定队列会使您注册以接收状态通知。
这很好,我可以成功地生成和接收这样的存在通知。但是,现在我想通过交换路由存在消息。最初,我尝试使用标头交换,但我没有看到任何消息,所以我改为扇出交换(以防我设置的标头匹配不正确)但我仍然没有看到任何消息通过.
这是我在没有额外交换的情况下生成和接收存在消息的脚本(即这是一个有效的脚本):
#!/usr/bin/env python3
import pika
import names
MY_NAME = names.get_first_name()
PRESENCE_EXCHANGE = 'presence'
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange=PRESENCE_EXCHANGE,
exchange_type='x-presence')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
print('My name is %s and my queue is %r' % (MY_NAME, queue_name))
channel.queue_bind(exchange=PRESENCE_EXCHANGE,
queue=queue_name,
routing_key=MY_NAME)
channel.queue_bind(exchange=PRESENCE_EXCHANGE,
queue=queue_name,
routing_key='')
def on_message(ch, method, properties, body):
print(method, '\n', properties, '\n', body)
exchange = method.exchange
if exchange == PRESENCE_EXCHANGE:
action = properties.headers['action']
who = properties.headers['key']
if action == 'bind':
print(' [+] %s has come online.' % (who,))
elif action == 'unbind':
print(' [-] %s has gone offline.' % (who,))
channel.basic_consume(queue=queue_name,
on_message_callback=on_message,
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
pass
finally:
connection.close()
我修改了上述内容以将存在消息路由到内置的扇出交换并将我的队列绑定到该交换:
...
print('My name is %s and my queue is %r' % (MY_NAME, queue_name))
channel.queue_bind(exchange=PRESENCE_EXCHANGE,
queue=queue_name,
routing_key=MY_NAME)
channel.exchange_bind(source=PRESENCE_EXCHANGE,
destination='amq.fanout',
routing_key='')
channel.queue_bind(exchange='amq.fanout',
queue=queue_name)
def on_message(ch, method, properties, body):
...
我很困惑为什么交易所没有收到消息。Erlang 不是我的语言之一,所以我在尝试读取存在插件的源代码以确定是否支持它时遇到了麻烦(尽管我不明白为什么不支持)。
如果有人有任何想法(或使用 RabbitMQ 处理存在的更好方法),我很想听听。
编辑:
使用此代码和两个客户端运行,我的交换和绑定如下所示:
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.event topic
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
presence x-presence
Listing bindings ...
exchange amq.gen-6aU7qS-ikR4cLmxcT6VKDQ queue amq.gen-6aU7qS-ikR4cLmxcT6VKDQ []
exchange amq.gen-MiyEpW9VIxD49PE9SqATFA queue amq.gen-MiyEpW9VIxD49PE9SqATFA []
amq.fanout exchange amq.gen-6aU7qS-ikR4cLmxcT6VKDQ queue amq.gen-6aU7qS-ikR4cLmxcT6VKDQ []
amq.fanout exchange amq.gen-MiyEpW9VIxD49PE9SqATFA queue amq.gen-MiyEpW9VIxD49PE9SqATFA []
presence exchange amq.fanout exchange []
presence exchange amq.gen-6aU7qS-ikR4cLmxcT6VKDQ queue Sheila []
presence exchange amq.gen-MiyEpW9VIxD49PE9SqATFA queue Joaquin []