1

我需要关于测试龙卷风应用程序的建议。现在我只是在玩演示聊天应用程序,但它看起来像是现实生活中的问题。

在处理程序中,我有:

class MessageUpdatesHandler(BaseHandler):
    @tornado.web.authenticated
    @tornado.web.asynchronous
    def post(self):
        cursor = self.get_argument("cursor", None)
        global_message_buffer.wait_for_messages(self.on_new_messages,
                                                cursor=cursor)

    def on_new_messages(self, messages):
        # Closed client connection
        if self.request.connection.stream.closed():
            return
        self.finish(dict(messages=messages))


class MessageBuffer(object):
    def __init__(self):
        ....

    def wait_for_messages(self, callback, cursor=None):
        if cursor:
            new_count = 0
           for msg in reversed(self.cache):
                if msg["id"] == cursor:
                    break
                new_count += 1
            if new_count:
                callback(self.cache[-new_count:])
                return
        self.waiters.add(callback)

    def cancel_wait(self, callback):
        .....

    def new_messages(self, messages):
        logging.info("Sending new message to %r listeners", len(self.waiters))
        for callback in self.waiters:
            try:
                callback(messages)
            except:
                logging.error("Error in waiter callback", exc_info=True)
        self.waiters = set()
        self.cache.extend(messages)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]

正如我提到的完整源代码在torndado 演示中

在我的测试中,我有:

@wsgi_safe
class MessageUpdatesHandlerTest(LoginedUserHanldersTest):
    Handler = MessageUpdatesHandler

    def test_add_message(self):
        from chatdemo import global_message_buffer
        kwargs = dict(
            method="POST",
            body='',
        )
        future = self.http_client.fetch(self.get_url('/'), callback=self.stop, **kwargs)
        message = {
        "id": '123',
        "from": "first_name",
        "body": "hello",
        "html": "html"
        }
        global_message_buffer.new_messages([message])
        response = self.wait()
        self.assertEqual(response.code, 200)
        self.mox.VerifyAll()

怎么了:

  1. 它创建了一个未来的对象
  2. 它发送一个 hello 消息,此时 nowaiter已注册,MessageBuffer因此不会调用回调
  3. wait启动 IoLoop 并制作后,获取并waiter注册在MessageBuffer
  4. 回调永远不会被调用,我的响应仍然是空的,所以一切都失败了

    AssertionError:异步操作在 5 秒后超时

我想要它做什么:

  1. 在帖子中将自己注册为服务员
  2. 收到一些消息
  3. 回复我 200 回复

谢谢您的帮助

4

0 回答 0