3

您能告诉我如何将消息从生产者传输到交换器,然后再传输到队列吗?

vhost_exabgp = "librabbitmq://SNIP!:SNIP!@<server>:5672/exabgp"
conn = kombu.Connection(vhost_exabgp)
exchange_datastore = kombu.Exchange(name="datastore", type="direct", durable=False, auto_delete=False, delivery_mode=1)
producer_exabgp = conn.Producer(exchange=exchange_datastore, routing_key=<router>, serializer="json", auto_declare=False)
msg = simplejson.dumps({'x': 1})
producer_exabgp.publish(msg, exchange=exchange_datastore, routing_key=<router>, serializer="json", delivery_mode=1, mandatory=False, immediate=False)

请注意,组合库不支持“强制”和“立即”。

在rabbit节点上,我定义了vhost,创建了默认的Exchanges,我还定义了“datastore”Exchange

在此处输入图像描述

并且还定义了vhost下的队列。

当我检查相应的队列时,我在队列中看不到任何消息,并且在执行上述代码时也没有出现异常。

你能告诉我如何调试它并就出了什么问题提出建议吗?

根据https://www.rabbitmq.com/tutorials/amqp-concepts.html当具有路由密钥 R 的新消息到达直接交换时,如果 K = R,则交换将其路由到队列

In [35]: requests.get('http://localhost:15672/api/queues/exabgp', auth=(os.environ['APP_USER'],os.environ['APP_PASSWD'])).json()
Out[35]: 
[{u'arguments': {},
  u'auto_delete': False,
  u'backing_queue_status': {u'avg_ack_egress_rate': 0.0,
   u'avg_ack_ingress_rate': 0.0,
   u'avg_egress_rate': 0.0,
   u'avg_ingress_rate': 0.0,
   u'delta': [u'delta', u'undefined', 0, u'undefined'],
   u'len': 0,
   u'next_seq_id': 0,
   u'q1': 0,
   u'q2': 0,
   u'q3': 0,
   u'q4': 0,
   u'target_ram_count': u'infinity'},
  u'consumer_utilisation': u'',
  u'consumers': 0,
  u'disk_reads': 0,
  u'disk_writes': 0,
  u'durable': False,
  u'exclusive_consumer_tag': u'',
  u'idle_since': u'2015-10-06 18:30:38',
  u'memory': 13936,
  u'message_bytes': 0,
  u'message_bytes_persistent': 0,
  u'message_bytes_ram': 0,
  u'message_bytes_ready': 0,
  u'message_bytes_unacknowledged': 0,
  u'messages': 0,
  u'messages_details': {u'rate': 0.0},
  u'messages_persistent': 0,
  u'messages_ram': 0,
  u'messages_ready': 0,
  u'messages_ready_details': {u'rate': 0.0},
  u'messages_ready_ram': 0,
  u'messages_unacknowledged': 0,
  u'messages_unacknowledged_details': {u'rate': 0.0},
  u'messages_unacknowledged_ram': 0,
  u'name': u'<router>',
  u'node': u'rabbit@<server>',
  u'policy': u'',
  u'recoverable_slaves': u'',
  u'state': u'running',
  u'vhost': u'exabgp'}]
4

0 回答 0