我在twisted 中实现pubsub 模式。并且想知道如何在试验中测试 pub 只触发正确的潜艇,而不是其他人。
我可以测试是否调用了正确的潜艇:
def test_pubsub(self):
d1 = defer.Deferred()
d2 = defer.Deferred()
self.bus.sub("/foo", lambda ev: d1.callback(ev))
self.bus.sub("/foo/bar", lambda ev: d2.callback(ev))
self.bus.pub("/foo/bar", {})
return defer.gatherResults([d1, d2])
test_pubsub.timeout = 2
现在我想测试一下
self.bus.sub("/foo/bar/baz", callback_will_not_be_called)
更新。我想出的唯一想法:
self.bus.sub('/foo/bar/baz', lambda ev: d4.callback(ev))
d4.addCallback(lambda e: self.fail("should not happen"))
reactor.callLater(1.9, lambda: d4.cancel())
self.assertFailure(d4, defer.CancelledError)