我正在尝试使用kombu来松散耦合我的软件的某些部分。由于我不需要持久性或进程间通信,我不想使用一些外部守护进程,所以我尝试了内存传输。我设置了一些测试代码
from kombu.mixins import ConsumerMixin
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from time import sleep
import threading
media_exchange = Exchange('media', 'direct')
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
queues = [video_queue]
class Worker(ConsumerMixin, threading.Thread):
def __init__(self):
self.connection = Connection('memory:///')
super(Worker, self).__init__()
def run(self):
print("YUP I'M RUNNING")
super(Worker, self).run()
print("BUT I WASN'T BLOCKED :(")
def get_consumers(self, Consumer, channel):
return [
Consumer(queues, callbacks=[self.on_message], accept=['json']),
]
def on_message(self, body, message):
print("RECEIVED MESSAGE: %r" % (body, ))
message.ack()
if __name__ == '__main__':
# setup root logger
setup_logging(loglevel='INFO', loggers=[''])
try:
worker = Worker()
worker.start()
except KeyboardInterrupt:
print('bye bye')
while True:
print("SEND MESSAGE")
with Connection('memory:///') as conn:
producer = conn.Producer(serializer='json')
producer.publish({"foo": "bar"}, exchange=media_exchange,
routing_key='video', declare=queues)
sleep(0.1)
但是,如果我使用内存传输,则消息传递速度很慢,而如果我使用 - 例如 - reddis 传输,结果似乎还可以。内存示例:
(rpc_video_player)[phas@mir src]$ python test_messages.py
YUP I'M RUNNING
SEND MESSAGE
Connected to memory://localhost//
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
SEND MESSAGE
雷迪斯示例:
(rpc_video_player)[phas@mir src]$ python test_messages.py
YUP I'M RUNNING
SEND MESSAGE
Connected to redis://localhost:6379//
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
SEND MESSAGE
RECEIVED MESSAGE: {'foo': 'bar'}
似乎有一些与缓冲区/预取/队列/东西相关的问题,但我没有真正的线索。我不知道这是一些配置问题还是内存传输中的某种错误/限制。任何形式的帮助将不胜感激。