我在我的 Django 应用程序中实现了一个服务器发送事件API,以将实时更新从我的后端流式传输到浏览器。后端是一个 Redis pubsub。我的 Django 视图如下所示:
def event_stream(request):
"""
Stream worker events out to browser.
"""
listener = events.Listener(
settings.EVENTS_PUBSUB_URL,
channels=[settings.EVENTS_PUBSUB_CHANNEL],
buffer_key=settings.EVENTS_BUFFER_KEY,
last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
)
return http.HttpResponse(listener, mimetype='text/event-stream')
我作为迭代器返回的 events.Listener 类如下所示:
class Listener(object):
def __init__(self, rcon_or_url, channels, buffer_key=None,
last_event_id=None):
if isinstance(rcon_or_url, redis.StrictRedis):
self.rcon = rcon_or_url
elif isinstance(rcon_or_url, basestring):
self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
self.channels = channels
self.buffer_key = buffer_key
self.last_event_id = last_event_id
self.pubsub = self.rcon.pubsub()
self.pubsub.subscribe(channels)
def __iter__(self):
# If we've been initted with a buffer key, then get all the events off
# that and spew them out before blocking on the pubsub.
if self.buffer_key:
buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)
# check whether msg with last_event_id is still in buffer. If so,
# trim buffered_events to have only newer messages.
if self.last_event_id:
# Note that we're looping through most recent messages first,
# here
counter = 0
for msg in buffered_events:
if (json.loads(msg)['id'] == self.last_event_id):
break
counter += 1
buffered_events = buffered_events[:counter]
for msg in reversed(list(buffered_events)):
# Stream out oldest messages first
yield to_sse({'data': msg})
try:
for msg in self.pubsub.listen():
if msg['type'] == 'message':
yield to_sse(msg)
finally:
logging.info('Closing pubsub')
self.pubsub.close()
self.rcon.connection_pool.disconnect()
使用此设置,我能够成功地将事件流式传输到浏览器。但是,似乎侦听器的“finally”中的断开连接调用实际上并没有被调用。我假设他们仍然在等待来自 pubsub 的消息。当客户端断开连接并重新连接时,我可以看到与我的 Redis 实例的连接数量不断攀升并且永不下降。一旦达到 1000 左右,Redis 就会开始发疯并消耗所有可用的 CPU。
我希望能够检测到客户端何时不再监听并关闭 Redis 连接。
我尝试过或想过的事情:
- 一个连接池。但正如redis-py README 所述,“在线程之间传递 PubSub 或 Pipeline 对象是不安全的。”
- 处理连接的中间件,或者可能只是断开连接。这不起作用,因为中间件的 process_response() 方法被调用得太早(甚至在 http 标头发送到客户端之前)。当我正在向他们流式传输内容时,当客户端断开连接时,我需要调用一些东西。
- request_finished和got_request_exception信号。第一个,如中间件中的 process_response(),似乎启动得太快了。当客户端在中途断开连接时,第二个不会被调用。
最后的皱纹:在生产中,我使用 Gevent,所以我可以一次保持大量连接打开。但是,无论我使用普通的旧“manage.py runserver”、Gevent monkeypatched runserver 或 Gunicorn 的 gevent 工作人员,都会出现此连接泄漏问题。