1

I'm trying to write some tests where I produce a message to a queue and see if the message gets correctly consumed and handled in the application.

For that I'm playing around with the kombu library and especially the in-memory Transport implementation.

Still I can't get it working, that the produced message gets consumed.

My questions is therefore, if anyone can provide a simple unit test that produces and consumes a message in-memory

4

1 回答 1

3

您需要将其调整为您尝试测试的代码,但您要查找的基本内容是内存控制器的 amqp URI,即“memory://”。作为一个非常简单的例子:

conn = kombu.Connection("memory://")
queue = conn.SimpleQueue('myqueue')
queue.put('test')
msg = queue.get(timeout=1)
msg.ack()
print(msg.payload)
于 2016-07-21T22:02:31.627 回答