3

我正在尝试使用kombu来松散耦合我的软件的某些部分。由于我不需要持久性或进程间通信,我不想使用一些外部守护进程,所以我尝试了内存传输。我设置了一些测试代码

from kombu.mixins import ConsumerMixin
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging

from time import sleep

import threading

media_exchange = Exchange('media', 'direct')
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
queues = [video_queue]


class Worker(ConsumerMixin, threading.Thread):

    def __init__(self):
        self.connection = Connection('memory:///')

        super(Worker, self).__init__()

    def run(self):
        print("YUP I'M RUNNING")
        super(Worker, self).run()
        print("BUT I WASN'T BLOCKED :(")

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues, callbacks=[self.on_message], accept=['json']),
        ]

    def on_message(self, body, message):
        print("RECEIVED MESSAGE: %r" % (body, ))
        message.ack()

if __name__ == '__main__':
    # setup root logger
    setup_logging(loglevel='INFO', loggers=[''])

    try:
        worker = Worker()
        worker.start()
    except KeyboardInterrupt:
        print('bye bye')

    while True:
        print("SEND MESSAGE")
        with Connection('memory:///') as conn:
            producer = conn.Producer(serializer='json')
            producer.publish({"foo": "bar"}, exchange=media_exchange,
                             routing_key='video', declare=queues)
        sleep(0.1)

但是,如果我使用内存传输,则消息传递速度很慢,而如果我使用 - 例如 - reddis 传输,结果似乎还可以。内存示例:

(rpc_video_player)[phas@mir src]$ python test_messages.py 
YUP I'M RUNNING
SEND MESSAGE
Connected to memory://localhost//
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE

雷迪斯示例:

(rpc_video_player)[phas@mir src]$ python test_messages.py 
YUP I'M RUNNING
SEND MESSAGE
Connected to redis://localhost:6379//
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}

似乎有一些与缓冲区/预取/队列/东西相关的问题,但我没有真正的线索。我不知道这是一些配置问题还是内存传输中的某种错误/限制。任何形式的帮助将不胜感激。

4

0 回答 0