4

我在我的 Django 应用程序中实现了一个服务器发送事件API,以将实时更新从我的后端流式传输到浏览器。后端是一个 Redis pubsub。我的 Django 视图如下所示:

def event_stream(request):
   """
   Stream worker events out to browser.
   """

   listener = events.Listener(
       settings.EVENTS_PUBSUB_URL,
       channels=[settings.EVENTS_PUBSUB_CHANNEL],
       buffer_key=settings.EVENTS_BUFFER_KEY,
       last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
   )

   return http.HttpResponse(listener, mimetype='text/event-stream')

我作为迭代器返回的 events.Listener 类如下所示:

class Listener(object):
    def __init__(self, rcon_or_url, channels, buffer_key=None,
                 last_event_id=None):
        if isinstance(rcon_or_url, redis.StrictRedis):
            self.rcon = rcon_or_url
        elif isinstance(rcon_or_url, basestring):
            self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
        self.channels = channels
        self.buffer_key = buffer_key
        self.last_event_id = last_event_id
        self.pubsub = self.rcon.pubsub()
        self.pubsub.subscribe(channels)

    def __iter__(self):
        # If we've been initted with a buffer key, then get all the events off
        # that and spew them out before blocking on the pubsub.
        if self.buffer_key:
            buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)

            # check whether msg with last_event_id is still in buffer.  If so,
            # trim buffered_events to have only newer messages.
            if self.last_event_id:
                # Note that we're looping through most recent messages first,
                # here
                counter = 0
                for msg in buffered_events:
                    if (json.loads(msg)['id'] == self.last_event_id):
                        break
                    counter += 1
                buffered_events = buffered_events[:counter]

            for msg in reversed(list(buffered_events)):
                # Stream out oldest messages first
                yield to_sse({'data': msg})
        try:
            for msg in self.pubsub.listen():
                if msg['type'] == 'message':
                    yield to_sse(msg)
        finally:
            logging.info('Closing pubsub')
            self.pubsub.close()
            self.rcon.connection_pool.disconnect()

使用此设置,我能够成功地将事件流式传输到浏览器。但是,似乎侦听器的“finally”中的断开连接调用实际上并没有被调用。我假设他们仍然在等待来自 pubsub 的消息。当客户端断开连接并重新连接时,我可以看到与我的 Redis 实例的连接数量不断攀升并且永不下降。一旦达到 1000 左右,Redis 就会开始发疯并消耗所有可用的 CPU。

我希望能够检测到客户端何时不再监听并关闭 Redis 连接。

我尝试过或想过的事情:

  1. 一个连接池。但正如redis-py README 所述,“在线程之间传递 PubSub 或 Pipeline 对象是不安全的。”
  2. 处理连接的中间件,或者可能只是断开连接。这不起作用,因为中间件的 process_response() 方法被调用得太早(甚至在 http 标头发送到客户端之前)。当我正在向他们流式传输内容时,当客户端断开连接时,我需要调用一些东西。
  3. request_finished和got_request_exception信号。第一个,如中间件中的 process_response(),似乎启动得太快了。当客户端在中途断开连接时,第二个不会被调用。

最后的皱纹:在生产中,我使用 Gevent,所以我可以一次保持大量连接打开。但是,无论我使用普通的旧“manage.py runserver”、Gevent monkeypatched runserver 或 Gunicorn 的 gevent 工作人员,都会出现此连接泄漏问题。

4

1 回答 1

1

更新:从 Django 1.5开始,如果您想像我在这个问题/答案中所做的那样懒惰地流式传输内容,则需要返回一个 StreamingHttpResponse 实例。

下面的原始答案

经过大量的尝试和阅读框架代码后,我发现我认为是这个问题的正确答案。

  1. 根据WSGI PEP,如果你的应用程序返回一个带有 close() 方法的迭代器,一旦响应完成,它应该由 WSGI 服务器调用。Django 也支持这一点。这是进行我需要的 Redis 连接清理的自然场所。
  2. Python 的 wsgiref 实现中存在一个错误,并且通过扩展在 Django 的“runserver”中存在一个错误,如果客户端与服务器中途断开连接,则会导致 close() 被跳过。我已经提交了一个补丁。
  3. 即使服务器尊重 close(),它也不会被调用,直到对客户端的写入实际失败。如果您的迭代器被阻塞在 pubsub 上等待并且没有发送任何内容,则不会调用 close()。每次客户端连接时,我都会通过向 pubsub 发送无操作消息来解决此问题。这样,当浏览器进行正常的重新连接时,现已失效的线程将尝试写入其关闭的连接,抛出异常,然后在服务器调用 close() 时被清理。SSE 规范说任何以冒号开头的行都是应该被忽略的注释,所以我只是发送“:\n”作为我的无操作消息来清除过时的客户端。

这是新代码。首先是 Django 视图:

def event_stream(request):
    """
    Stream worker events out to browser.
    """
    return events.SSEResponse(
        settings.EVENTS_PUBSUB_URL,
        channels=[settings.EVENTS_PUBSUB_CHANNEL],
        buffer_key=settings.EVENTS_BUFFER_KEY,
        last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
    )

完成这项工作的 Listener 类,以及一个格式化 SSE 的辅助函数和一个让视图更清晰的 HTTPResponse 子类:

class Listener(object):
    def __init__(self,
                 rcon_or_url=settings.EVENTS_PUBSUB_URL,
                 channels=None,
                 buffer_key=settings.EVENTS_BUFFER_KEY,
                 last_event_id=None):
        if isinstance(rcon_or_url, redis.StrictRedis):
            self.rcon = rcon_or_url
        elif isinstance(rcon_or_url, basestring):
            self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
        if channels is None:
            channels = [settings.EVENTS_PUBSUB_CHANNEL]
        self.channels = channels
        self.buffer_key = buffer_key
        self.last_event_id = last_event_id
        self.pubsub = self.rcon.pubsub()
        self.pubsub.subscribe(channels)

        # Send a superfluous message down the pubsub to flush out stale
        # connections.
        for channel in self.channels:
            # Use buffer_key=None since these pings never need to be remembered
            # and replayed.
            sender = Sender(self.rcon, channel, None)
            sender.publish('_flush', tags=['hidden'])

    def __iter__(self):
        # If we've been initted with a buffer key, then get all the events off
        # that and spew them out before blocking on the pubsub.
        if self.buffer_key:
            buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)

            # check whether msg with last_event_id is still in buffer.  If so,
            # trim buffered_events to have only newer messages.
            if self.last_event_id:
                # Note that we're looping through most recent messages first,
                # here
                counter = 0
                for msg in buffered_events:
                    if (json.loads(msg)['id'] == self.last_event_id):
                        break
                    counter += 1
                buffered_events = buffered_events[:counter]

            for msg in reversed(list(buffered_events)):
                # Stream out oldest messages first
                yield to_sse({'data': msg})

        for msg in self.pubsub.listen():
            if msg['type'] == 'message':
                yield to_sse(msg)

    def close(self):
        self.pubsub.close()
        self.rcon.connection_pool.disconnect()


class SSEResponse(HttpResponse):
    def __init__(self, rcon_or_url, channels, buffer_key=None,
                 last_event_id=None, *args, **kwargs):
        self.listener = Listener(rcon_or_url, channels, buffer_key,
                                 last_event_id)
        super(SSEResponse, self).__init__(self.listener,
                                          mimetype='text/event-stream',
                                          *args, **kwargs)

    def close(self):
        """
        This will be called by the WSGI server at the end of the request, even
        if the client disconnects midstream.  Unless you're using Django's
        runserver, in which case you should expect to see Redis connections
        build up until http://bugs.python.org/issue16220 is fixed.
        """
        self.listener.close()


def to_sse(msg):
    """
    Given a Redis pubsub message that was published by a Sender (ie, has a JSON
    body with time, message, title, tags, and id), return a properly-formatted
    SSE string.
    """
    data = json.loads(msg['data'])

    # According to the SSE spec, lines beginning with a colon should be
    # ignored.  We can use that as a way to force zombie listeners to try
    # pushing something down the socket and clean up their redis connections
    # when they get an error.
    # See http://dev.w3.org/html5/eventsource/#event-stream-interpretation
    if data['message'] == '_flush':
        return ":\n"  # Administering colonic!

    if 'id' in data:
        out = "id: " + data['id'] + '\n'
    else:
        out = ''
    if 'name' in data:
        out += 'name: ' + data['name'] + '\n'

    payload = json.dumps({
        'time': data['time'],
        'message': data['message'],
        'tags': data['tags'],
        'title': data['title'],
    })
    out += 'data: ' + payload + '\n\n'
    return out
于 2012-10-15T02:14:46.507 回答