我有一个 AMQP 服务器 ( RabbitMQ ),我想在Tornado Web 服务器中发布和读取它。为此,我想我会使用异步 amqp python 库;特别是Pika(它的一种变体,据说支持 Tornado)。
我编写的代码似乎成功地从队列中读取,除了在请求结束时,我得到一个异常(浏览器返回正常):
[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
Traceback (most recent call last):
File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
yield
File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
yield
File "/usr/lib/python2.6/contextlib.py", line 113, in nested
yield vars
File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
callback(*args, **kwargs)
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
self._handle_read()
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
self.on_data_available(chunk)
File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
self.channels[frame.channel_number].frame_handler(frame)
KeyError: 1
我不完全确定我是否正确使用了这个库,所以我可能做错了什么。我的代码的基本流程是:
- 请求进来
- 使用 TornadoConnection 创建到 RabbitMQ 的连接;指定回调
- 在连接回调中,创建一个通道,声明/绑定我的队列,并调用 basic_consume;指定回调
- 在消费回调中,关闭通道并调用 Tornado 的完成函数。
- 见异常。
我的问题有几个:
- 这个流程是否正确?我不确定连接回调的目的是什么,除非我不使用它就不起作用。
- 我应该为每个 Web 请求创建一个 AMQP 连接吗?RabbitMQ 的文档建议不,我不应该,而是应该坚持只创建通道。但是我将在哪里创建连接,如果它短暂中断,我该如何尝试重新连接?
- 如果我为每个 Web 请求创建一个 AMQP 连接,我应该在哪里关闭它?在我的回调中调用 amqp.close() 似乎把事情搞砸了。
稍后我将尝试编写一些示例代码,但我上面描述的步骤相当完整地展示了事物的消耗方面。我也遇到了发布方面的问题,但是队列的消耗更加紧迫。