我先解释一下我的情况。想要完成的是我的服务器上的多个生产者和多个消费者以及一个代理。我希望消息是持久的,队列是持久的。我希望有多个不会相互干扰的队列。一个队列,一个生产者和一个消费者,这已经很好了。当前代码:
消费者:
from qpid.connection import Connection
from qpid.datatypes import Message, uuid4
from qpid.util import connect
socket = connect('localhost',5672)
connection = Connection(sock=socket, username='***', password='***')
connection.start()
session = connection.session(str(uuid4()))
local_queue_name = 'my_queue'
queue = session.incoming(local_queue_name)
session.message_subscribe(queue='ProdSixQueue', destination=local_queue_name)
queue.start()
content=''
while content != 'Done':
message = queue.get(timeout=1000000)
while(queue.empty() == True):
time.sleep(1)
content = message.body
if(validate(etree.fromstring(content))):
session.message_accept(RangedSet(message.id))
store(content) #my function to store data..
制片人:
from qpid.connection import Connection
from qpid.datatypes import Message, uuid4
from qpid.util import connect
def produce(theMsg):
socket = connect('ser.ver.s.ip',5672)
connection = Connection(sock=socket,username='***',password='***')
connection.start()
session = connection.sesssion(str(uuid4()))
session.queue_declare(queue='ProdSixQueue')
session.exchange_bind(exchange='amg.direct',queue='ProdSixQueue',binding_key='routing_key')
properties = session.delivery_properties(routing_key='routing_key')
session.message_transfer(destination='amq.direct',message=Message(properties,str(theMsg)))
session.close(timeout=100)
当然,这些不是整个程序,而是 Qpid 代理中涉及的所有代码。
现在的问题是,如果我为不同的设备创建新的消费者和生产者,并且我想将不同的流量从生产者转移到消费者,那么第一个消费者会从代理窃取所有数据,即使我将其发送到不同的队列,比如 ProdSevenQueue ,并在其他消费者尝试将其存储为不同的本地队列,如“prodseven_local_Queue”。我是否必须以不同的方式使用队列,还是我误解了这里的整个想法?此外,我听到人们谈论配置代理顶部将某些类型的流量重定向到其他地方,但我没有找到这样的例子。推动正确的方向会很棒。更让你困惑的是,这里的情况图片:链接到 imgur
为了我的辩护,我以前从未使用 AMQP 或 Apache Qpid 做过任何事情,而且我还在学习 python。还有一点要抱怨的是,我从未见过比 Apache Qpid 更不连贯的文档。显然不喜欢新玩家。