我有一个继承的类,kombu.ConsumerProducerMixin
我想在没有实际运行 rabbitmq 服务的情况下对其进行测试。
class Aggregator(ConsumerProducerMixin):
def __init__(self, broker_url):
exchange_name = 'chargers'
self.status = 0
self.connection = Connection(broker_url)
...
在我的测试文件中,我执行了以下操作:
from unittest.mock import Mock, patch
from aggregator import Aggregator
@patch('kombu.connection.Connection')
def test_on_request(conn_mock):
agg = Aggregator('localhost')
m = Message("", {"action": "start"}, content_type="application/json")
使用调试器进入Aggregator.__init__
,我看到它connection
仍然没有被修补为一个Mock
实例:
(Pdb) self.connection
<Connection: amqp://guest:**@localhost:5672// at 0x7fc8b7f636d8>
(Pdb) Connection
<class 'kombu.connection.Connection'>
我的问题是如何正确修补连接,这样我就不需要rabbitmq 来运行测试?