2

我正在构建一个text/event-stream基于视图,aiohttp并在aioredis实现中使用来自 Redis 的 pub-sub。它看起来像:

从服务器获取一些数据并发布到 chanell 的脚本

def main(host, port):
    server_logger.info('Got params connection host {0}, port {1}'.format(host, port))
    loop = asyncio.get_event_loop()
    title = None
    redis = loop.run_until_complete(create_redis(('localhost', 6379)))
    while True:
        new_title = loop.run_until_complete(get_title(host, port))
        if new_title != title:
            loop.run_until_complete(redis.publish('CHANNEL', new_title))
            title = new_title
    loop.close()
    return False

订阅频道并将其写入 Stream 响应的 aiohttp 视图

stream = web.StreamResponse()
stream.headers['Content-Type'] = 'text/event-stream'
stream.headers['Cache-Control'] = 'no-cache'
stream.headers['Connection'] = 'keep-alive'

await stream.prepare(request)

redis = await create_redis(('localhost', 6379))
channel = (await redis.subscribe('CHANNEL'))[0]

while await channel.wait_message():
        message = await channel.get()
        if message:
            stream.write(b'event: track_update\r\n')
            stream.write(b'data: ' + message + b'\r\n\r\n')
        else:
            continue

我得到了很多次类似的东西:

DEBUG:aioredis:Creating tcp connection to ('localhost', 6379)

所以连接丢失,这也导致concurrent.futures.CancelledError和保持连接将丢失。经常丢失连接可以吗?我期待有持久的连接,如果我遗漏了什么,对不起。

4

1 回答 1

3

首先在请求处理程序中创建新的 redis 连接是个坏主意。请为每个应用程序使用一个连接池。

您可能会得到https://github.com/KeepSafe/aiohttp/blob/master/demos/polls/aiohttpdemo_polls/main.py作为推荐设计原则的草图。

关于保持活动连接——它们不是很持久,但默认情况下在 75 秒不活动期后关闭。

您可以通过将keep_alive=300参数传递给app.make_handler()调用来增加周期,但设置非常大的值并不可靠——在 TCP 自然期间,在某些情况下连接可能会在没有通知的情况下中断。如果您没有要传递的数据,最好保持合理的慢超时并定期向服务器发送自定义 ping 请求。

于 2016-09-12T13:38:30.377 回答