我需要关于测试龙卷风应用程序的建议。现在我只是在玩演示聊天应用程序,但它看起来像是现实生活中的问题。
在处理程序中,我有:
class MessageUpdatesHandler(BaseHandler):
@tornado.web.authenticated
@tornado.web.asynchronous
def post(self):
cursor = self.get_argument("cursor", None)
global_message_buffer.wait_for_messages(self.on_new_messages,
cursor=cursor)
def on_new_messages(self, messages):
# Closed client connection
if self.request.connection.stream.closed():
return
self.finish(dict(messages=messages))
class MessageBuffer(object):
def __init__(self):
....
def wait_for_messages(self, callback, cursor=None):
if cursor:
new_count = 0
for msg in reversed(self.cache):
if msg["id"] == cursor:
break
new_count += 1
if new_count:
callback(self.cache[-new_count:])
return
self.waiters.add(callback)
def cancel_wait(self, callback):
.....
def new_messages(self, messages):
logging.info("Sending new message to %r listeners", len(self.waiters))
for callback in self.waiters:
try:
callback(messages)
except:
logging.error("Error in waiter callback", exc_info=True)
self.waiters = set()
self.cache.extend(messages)
if len(self.cache) > self.cache_size:
self.cache = self.cache[-self.cache_size:]
正如我提到的完整源代码在torndado 演示中
在我的测试中,我有:
@wsgi_safe
class MessageUpdatesHandlerTest(LoginedUserHanldersTest):
Handler = MessageUpdatesHandler
def test_add_message(self):
from chatdemo import global_message_buffer
kwargs = dict(
method="POST",
body='',
)
future = self.http_client.fetch(self.get_url('/'), callback=self.stop, **kwargs)
message = {
"id": '123',
"from": "first_name",
"body": "hello",
"html": "html"
}
global_message_buffer.new_messages([message])
response = self.wait()
self.assertEqual(response.code, 200)
self.mox.VerifyAll()
怎么了:
- 它创建了一个未来的对象
- 它发送一个 hello 消息,此时 no
waiter
已注册,MessageBuffer
因此不会调用回调 - 在
wait
启动 IoLoop 并制作后,获取并waiter
注册在MessageBuffer
回调永远不会被调用,我的响应仍然是空的,所以一切都失败了
AssertionError:异步操作在 5 秒后超时
我想要它做什么:
- 在帖子中将自己注册为服务员
- 收到一些消息
- 回复我 200 回复
谢谢您的帮助