2

我有 2 个队列,比如 q1 和 q2,它们对应于 e1 和 e2 交换与绑定密钥 b1 和 b2。我想并行运行消费者函数,比如 c1 和 c2,它们将分别监听 q1 和 q2。我尝试了以下方法:

def c1():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
    channel = connection.channel()
    channel.exchange_declare(exchange='e1', durable='true',
                         type='topic')
    result = channel.queue_declare(durable='false', queue='q1')
    queue_name = result.method.queue
    binding_key = "b1"
    channel.queue_bind(exchange='e1',
                       queue=queue_name,
                       routing_key=binding_key)
    channel.basic_consume(callback,queue=queue_name,no_ack=False)
    channel.start_consuming()

def c2():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
    channel = connection.channel()
    channel.exchange_declare(exchange='e2', durable='true',
                         type='topic')
    result = channel.queue_declare(durable='false', queue='q2')
    queue_name = result.method.queue
    binding_key = "b2"
    channel.queue_bind(exchange=e1,
                       queue=queue_name,
                       routing_key=binding_key)
    channel.basic_consume(callback,queue=queue_name,no_ack=False)
    channel.start_consuming()

if __name__ == '__main__':
    c1()
    c2()

但是,它只监听 c1 函数和 c2 函数,并没有被执行。如何运行这两个功能?提前致谢。

编辑:我在 2 个不同的模块(文件)中有方法 c1 和 c1

4

1 回答 1

6

为了同时运行这两个功能,需要按顺序执行一些多线程方法。请在此处查看一些 python 示例。

这是使用 Process 类修改的代码。它还可以使用线程或从操作系统显式运行它。

import pika
from multiprocessing import Process


def callback():
    print 'callback got data'


class c1():
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange='e1', durable='true', type='topic')
        result = self.channel.queue_declare(durable='false', queue='q1')
        queue_name = result.method.queue
        binding_key = "b1"
        self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key)
        self.channel.basic_consume(callback,queue=queue_name,no_ack=False)

    def run(self):
        self.channel.start_consuming()


class c2():
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange='e2', durable='true', type='topic')
        result = self.channel.queue_declare(durable='false', queue='q2')
        queue_name = result.method.queue
        binding_key = "b2"
        self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key)

        self.channel.basic_consume(callback,queue=queue_name,no_ack=False)

    def run(self):
        self.channel.start_consuming()

if __name__ == '__main__':
    subscriber_list = []
    subscriber_list.append(c1())
    subscriber_list.append(c2())

    # execute
    process_list = []
    for sub in subscriber_list:
        process = Process(target=sub.run)
        process.start()
        process_list.append(process)

    # wait for all process to finish
    for process in process_list:
        process.join()
于 2017-07-17T10:55:40.640 回答