0

我先解释一下我的情况。想要完成的是我的服务器上的多个生产者和多个消费者以及一个代理。我希望消息是持久的,队列是持久的。我希望有多个不会相互干扰的队列。一个队列,一个生产者和一个消费者,这已经很好了。当前代码:

消费者:

from qpid.connection import Connection
from qpid.datatypes import Message, uuid4
from qpid.util import connect

socket = connect('localhost',5672)
connection = Connection(sock=socket, username='***', password='***')
connection.start()
session = connection.session(str(uuid4()))
local_queue_name = 'my_queue'
queue = session.incoming(local_queue_name)
session.message_subscribe(queue='ProdSixQueue', destination=local_queue_name)
queue.start()
content=''
while content != 'Done':
    message = queue.get(timeout=1000000)
    while(queue.empty() == True):
        time.sleep(1)
    content = message.body
    if(validate(etree.fromstring(content))):
        session.message_accept(RangedSet(message.id)) 
        store(content) #my function to store data..

制片人:

from qpid.connection import Connection
from qpid.datatypes import Message, uuid4
from qpid.util import connect

def produce(theMsg):
        socket = connect('ser.ver.s.ip',5672)
        connection = Connection(sock=socket,username='***',password='***')
        connection.start()
        session = connection.sesssion(str(uuid4()))
        session.queue_declare(queue='ProdSixQueue')
        session.exchange_bind(exchange='amg.direct',queue='ProdSixQueue',binding_key='routing_key')
        properties = session.delivery_properties(routing_key='routing_key')
        session.message_transfer(destination='amq.direct',message=Message(properties,str(theMsg)))
        session.close(timeout=100)

当然,这些不是整个程序,而是 Qpid 代理中涉及的所有代码。

现在的问题是,如果我为不同的设备创建新的消费者和生产者,并且我想将不同的流量从生产者转移到消费者,那么第一个消费者会从代理窃取所有数据,即使我将其发送到不同的队列,比如 ProdSevenQueue ,并在其他消费者尝试将其存储为不同的本地队列,如“prodseven_local_Queue”。我是否必须以不同的方式使用队列,还是我误解了这里的整个想法?此外,我听到人们谈论配置代理顶部将某些类型的流量重定向到其他地方,但我没有找到这样的例子。推动正确的方向会很棒。更让你困惑的是,这里的情况图片:链接到 imgur

为了我的辩护,我以前从未使用 AMQP 或 Apache Qpid 做过任何事情,而且我还在学习 python。还有一点要抱怨的是,我从未见过比 Apache Qpid 更不连贯的文档。显然不喜欢新玩家。

4

1 回答 1

1

您的程序表明对 Qpid 工作方式的误解。

在 Qpid(对于 AMQP 协议 0-8 到 0-10)中,消息生产者向Exchanges发送消息。然后 Exchange 负责将消息路由到零个或多个队列。该路由的精确细节取决于交换类型。正是通过这种机制,Qpid 支持常见的消息传递拓扑(点对点、发布/订阅、扇出等)。

您的用例需要使用直接交换的实例(例如内置的 amq.direct)。

直接交换基于消息的路由键和用于将队列绑定到交换的绑定键之间的精确匹配将消息路由到队列。一个常见的约定是使用队列本身的名称作为绑定键将队列绑定到交换器。看来您的程序目前正在为此使用字符串“routing_key”,我怀疑会解释您观察到的不良行为。

你会在这里找到更多的解释:

http://qpid.apache.org/releases/qpid-0.24/java-broker/book/Java-Broker-Concepts-Exchanges.html(Qpid Java Broker 文档 - 但概念是共享的)

https://access.redhat.com/site/documentation/en-US/Red_Hat_Enterprise_MRG/1.1/html/Messaging_User_Guide/chap-Messaging_User_Guide-Exchanges.html(Redhat的 MRG 文档用有用的图表解释了相同的概念)

Python 示例(参见 Qpid 网站)是一个有用的参考。

于 2013-09-10T22:35:04.900 回答