4

我正在尝试为使用高速公路的应用程序编写单元测试。

我想测试我的控制器,它从协议中获取接收到的数据,解析它并对它做出反应。

但是当我的测试到了应该断开协议时(self.sendClose)然后它会引发错误

exceptions.AttributeError: 'MyProtocol' object has no attribute 'state'.

我试图makeConnection使用proto_helpers.StringTransport,但后来我也有错误

exceptions.AttributeError: StringTransport instance has no attribute 'setTcpNoDelay'`

我正在使用trial并且我不想仅出于测试目的运行虚拟服务器/客户端,因为不建议这样做。

我应该如何编写测试,以便可以使用假连接和试用来测试发送数据、读取数据、断开连接等功能?

4

1 回答 1

5

MyProtocol如果不看课堂,很难准确地说出发生了什么。这个问题听起来很像它是因为你直接搞乱了低级函数,因此也是类的state属性,也就是说WebSocket,WebSocket 连接的内部状态的表示。

根据高速公路参考文档,您可以直接使用和覆盖的 APIWebSicketProtocol是:

  • 打开
  • onMessage
  • 关闭
  • 发信息
  • 发送关闭

您使用StringTransport来测试您的协议的方法并不理想。问题在于autobahn 提供MyProtocol的框架之上的一个微小层,WebSocketProtocol无论好坏,它隐藏了有关管理连接、传输和内部协议状态的细节。

如果你考虑一下,你想测试你的东西,而不是WebSocketProtocol,因此如果你不想嵌入一个虚拟服务器或客户端,你最好的选择是直接测试MyProtocol覆盖的方法。

我所说的一个例子如下

class MyPublisher(object):
    cbk=None

    def publish(self, msg):
        if self.cbk:
            self.cbk(msg)

class MyProtocol(WebSocketServerProtocol):

    def __init__(self, publisher):
        WebSocketServerProtocol.__init__(self)
        #Defining callback for publisher
        publisher.cbk = self.sendMessage

    def onMessage(self, msg, binary)
        #Stupid echo
        self.sendMessage(msg)

class NotificationTest(unittest.TestCase):    

    class MyProtocolFactory(WebSocketServerFactory):
        def __init__(self, publisher):
            WebSocketServerFactory.__init__(self, "ws://127.0.0.1:8081")
            self.publisher = publisher
            self.openHandshakeTimeout = None

        def buildProtocol(self, addr):
            protocol =  MyProtocol(self.listener)
            protocol.factory = self
            protocol.websocket_version = 13 #Hybi version 13 is supported by pretty much everyone (apart from IE <8 and android browsers)
            return protocol

    def setUp(self):
        publisher = task.LoopingCall(self.send_stuff, "Hi there")        
        factory = NotificationTest.MyProtocolFactory(listener)
        protocol = factory.buildProtocol(None)
        transport = proto_helpers.StringTransport()
        def play_dumb(*args): pass
        setattr(transport, "setTcpNoDelay", play_dumb)
        protocol.makeConnection(transport)
        self.protocol, self.transport, self.publisher, self.fingerprint_handler =  protocol, transport, publisher, fingerprint_handler

    def test_onMessage(self):
        #Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with!
        self.protocol.state = WebSocketProtocol.STATE_OPEN
        self.protocol.websocket_version = 13
        self.protocol.onMessage("Whatever")
        self.assertEqual(self.transport.value()[2:], 'Whatever')

    def test_push(self):              
        #Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with!
        self.protocol.state = WebSocketProtocol.STATE_OPEN
        self.protocol.websocket_version = 13
        self.publisher.publish("Hi there")
        self.assertEqual(self.transport.value()[2:], 'Hi There')

您可能已经注意到,使用StringTransport这里非常麻烦。您必须了解下划线框架并绕过它的状态管理,这是您不想做的事情。不幸的是,高速公路没有提供可以轻松进行状态操作的即用型测试对象,因此我使用虚拟服务器和客户端的建议仍然有效


使用网络测试您的服务器

提供的测试展示了如何测试服务器推送,断言你得到的是你期望的,并且还使用了一个关于如何确定何时完成的钩子。

服务器协议

from twisted.trial.unittest import TestCase as TrialTest
from autobahn.websocket import WebSocketServerProtocol, WebSocketServerFactory, WebSocketClientProtocol, WebSocketClientFactory, connectWS, listenWS
from twisted.internet.defer import Deferred
from twisted.internet import task 

START="START"            

class TestServerProtocol(WebSocketServerProtocol):

    def __init__(self):
        #The publisher task simulates an event that triggers a message push
        self.publisher = task.LoopingCall(self.send_stuff, "Hi there")

    def send_stuff(self, msg):
        #this method sends a message to the client
        self.sendMessage(msg)

    def _on_start(self):
        #here we trigger the task to execute every second
        self.publisher.start(1.0)

    def onMessage(self, message, binary):
        #According to this stupid protocol, the server starts sending stuff when the client sends a "START" message
        #You can plug other commands in here
        {
           START : self._on_start
           #Put other keys here
        }[message]()

    def onClose(self, wasClean, code, reason):
        #After closing the connection, we tell the task to stop sending messages
        self.publisher.stop()

客户端协议和工厂

下一个类是客户端协议。它基本上告诉服务器开始推送消息。它调用close_condition它们以查看是否是关闭连接的时候了,最后,它调用assertion它收到的消息上的函数以查看测试是否成功

class TestClientProtocol(WebSocketClientProtocol):
    def __init__(self, assertion, close_condition, timeout, *args, **kwargs):
        self.assertion = assertion
        self.close_condition = close_condition
        self._received_msgs = [] 
        from twisted.internet import reactor
        #This is a way to set a timeout for your test 
        #in case you never meet the conditions dictated by close_condition
        self.damocle_sword = reactor.callLater(timeout, self.sendClose)

    def onOpen(self):
        #After the connection has been established, 
        #you can tell the server to send its stuff
        self.sendMessage(START)

    def onMessage(self, msg, binary):
        #Here you get the messages pushed from the server
        self._received_msgs.append(msg)
        #If it is time to close the connection
        if self.close_condition(msg):
            self.damocle_sword.cancel()
            self.sendClose()

    def onClose(self, wasClean, code, reason):
        #Now it is the right time to check our test assertions
        self.assertion.callback(self._received_msgs)

class TestClientProtocolFactory(WebSocketClientFactory):
    def __init__(self, assertion, close_condition, timeout, **kwargs):
        WebSocketClientFactory.__init__(self, **kwargs)
        self.assertion = assertion
        self.close_condition = close_condition
        self.timeout = timeout
        #This parameter needs to be forced to None to not leave the reactor dirty
        self.openHandshakeTimeout = None

    def buildProtocol(self, addr):
        protocol = TestClientProtocol(self.assertion, self.close_condition, self.timeout)
        protocol.factory = self
        return protocol

基于试验的测试

class WebSocketTest(TrialTest):

    def setUp(self):
        port = 8088
        factory = WebSocketServerFactory("ws://localhost:{}".format(port))
        factory.protocol = TestServerProtocol
        self.listening_port = listenWS(factory)
        self.factory, self.port = factory, port

    def tearDown(self):
        #cleaning up stuff otherwise the reactor complains
        self.listening_port.stopListening()

    def test_message_reception(self): 
        #This is the test assertion, we are testing that the messages received were 3
        def assertion(msgs):
            self.assertEquals(len(msgs), 3)

        #This class says when the connection with the server should be finalized. 
        #In this case the condition to close the connectionis for the client to get 3 messages
        class CommunicationHandler(object):
            msg_count = 0

            def close_condition(self, msg):
                self.msg_count += 1
                return self.msg_count == 3

        d = Deferred()
        d.addCallback(assertion)
        #Here we create the client...
        client_factory = TestClientProtocolFactory(d, CommunicationHandler().close_condition, 5, url="ws://localhost:{}".format(self.port))
        #...and we connect it to the server
        connectWS(client_factory)
        #returning the assertion as a deferred purely for demonstration
        return d

这显然只是一个例子,但正如你所看到的,我不必乱搞makeConnectiontransport明确

于 2013-02-13T00:08:33.500 回答