0

我的配置:在主机上运行 qpidd:192.168.80.81 并进行以下交换(qpid-config exchanges):

Type      Exchange Name       Attributes
==================================================
direct                        --replicate=none
direct    amq.direct          --durable --replicate=none
fanout    amq.fanout          --durable --replicate=none
headers   amq.match           --durable --replicate=none
topic     amq.topic           --durable --replicate=none
direct    qmf.default.direct  --replicate=none
topic     qmf.default.topic   --replicate=none
topic     qpid.management     --replicate=none

我有多个制作人发布到主题,如amq.topic/com.product.sample1,,amq.topic/com.product.sample2你得到了模式。

我可以通过在命令行上运行来接收来自生产者的所有消息qpid-receive -b 192.168.80.81 -a amq.topic/com.product.sample1 -f

但是当涉及到使用python-qpid-proton库(版本 0.35.0)在 Python 中实现这一点时,它不会按需要工作。这是我接收特定主题消息的 python 文件:

from proton.handlers import MessagingHandler
from proton.reactor import Container

broker_url = "192.168.80.81:5672"
topic = "amq.topic/com.product.sample"


class Client(MessagingHandler):
    def __init__(self, broker_url, topic):
        super(Client, self).__init__()
        self.broker_url = broker_url
        self.topic = topic

    def on_start(self, event):
        conn = event.container.connect(self.broker_url)
        self.receiver = event.container.create_receiver(
            conn, self.topic, dynamic=True)

    def on_message(self, event):
        print(event.message.body)


Container(Client(broker_url, topic)).run()

谁能帮助我并指出我的错误在哪里?非常感谢您的帮助!

4

1 回答 1

0

在这里回答:https ://issues.apache.org/jira/browse/PROTON-2469

这里的问题是经纪人。它无法识别 amq.topic/com.product.sample。但是,它确实支持主题绑定过滤器。为此,您将使用以下内容:

from proton import Described, symbol
from proton.handlers import MessagingHandler
from proton.reactor import Container, Filter

broker_url = "192.168.80.81:5672"
topic = "amq.topic"

class SubjectFilter(Filter):
    def __init__(self, value):
        super(SubjectFilter, self).__init__(\{symbol('subject-filter'): Described(symbol('apache.org:legacy-amqp-topic-binding:string'), value)})

class Client(MessagingHandler):
    def __init__(self, broker_url, topic):
        super(Client, self).__init__()
        self.broker_url = broker_url
        self.topic = topic

    def on_start(self, event):
        conn = event.container.connect(self.broker_url)
        self.receiver = event.container.create_receiver(
            conn, self.topic, options=SubjectFilter("com.product.sample"))

    def on_message(self, event):
        print(event.message.body)

Container(Client(broker_url, topic)).run()
于 2021-11-24T17:01:07.157 回答