2

我正在尝试组合一个简单的 Flask / socketio / eventlet 服务器来订阅 Redis 事件。我看到的行为是,启用 Flask 调试后,每次 Werkzeug 检测到更改并重新启动 socketio 时,我的另一个 redis 侦听器也会启动(除了旧的侦听器不退出)。

这是一个删除了所有 socketio 处理程序的工作版本:

import json
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
from flask.ext.redis import FlaskRedis
import eventlet
eventlet.monkey_patch()

with open('config/flask.json') as f:
    config_flask = json.load(f)

app = Flask(__name__, static_folder='public', static_url_path='')
app.config.update(
      DEBUG= True,
      PROPAGATE_EXCEPTIONS= True,
      REDIS_URL= "redis://localhost:6379/0"
    )

redis_cache = FlaskRedis(app)
socketio = SocketIO(app)

@app.route('/')
def index():
    cache = {}
    return render_template('index.html', **cache)

def redisReader():
    print 'Starting Redis subscriber'
    pubsub = redis_cache.pubsub()
    pubsub.subscribe('msg')
    for msg in pubsub.listen():
        print '>>>>>', msg

def runSocket():
    print "Starting webserver"
    socketio.run(app, host='0.0.0.0')

if __name__ == '__main__':
    pool = eventlet.GreenPool()
    pool.spawn(redisReader)
    pool.spawn(runSocket)
    pool.waitall()

加入一些手动 redis-cli 发布 (PUBLISH msg himom) 这将产生以下输出:

Starting Redis subscriber
Starting webserver
 * Restarting with stat
>>>>> {'pattern': None, 'type': 'subscribe', 'channel': 'msg', 'data': 1L}
Starting Redis subscriber
Starting webserver
 * Debugger is active!
 * Debugger pin code: 789-323-740
(22252) wsgi starting up on http://0.0.0.0:5000
>>>>> {'pattern': None, 'type': 'subscribe', 'channel': 'msg', 'data': 1L}
>>>>> {'pattern': None, 'type': 'message', 'channel': 'msg', 'data': 'himom'}
>>>>> {'pattern': None, 'type': 'message', 'channel': 'msg', 'data': 'himom'}

为什么 Redis 监听器启动多次?如果我进行更改并保存它们,Werkzeug 每次都会启动另一个。我该如何正确处理?

以下是涉及的软件包及其版本的列表:

  • Python 2.7.6
  • 烧瓶 0.10.1
  • Werkzeug 0.11.4
  • 小事件 0.18.4
  • 格林莱特 0.4.9
  • Flask-Redis 0.1.0
  • Flask-SocketIO 2.2

** 更新 ** 我现在有一个部分解决方案。上面的一切都保持不变,除了池行为已移至 Flask 的“before_first_request”函数中:

def setupRedis():
    print "setting up redis"
    pool = eventlet.GreenPool()
    pool.spawn(redisReader)

def runSocket():
    print "Starting Webserver"
    socketio.run(app, host='0.0.0.0')

if __name__ == '__main__':
    app.before_first_request(setupRedis)
    print app.before_first_request_funcs
    runSocket()

剩下的问题是 'before_first_request' 不处理存在现有 websocket 的情况,但这是一个单独的问题。

4

2 回答 2

1

添加简单print(os.getpid())并观察ps aux。您应该注意到有两个 Python 进程。更改DEBUG=False,并且“问题”没有重现,现在也有一个 Python 进程。

所以问题是,redisReader无论它是“工作管理器”还是实际的“请求处理程序”进程,您的代码都会创建。所以,不要pool无条件地创建和启动它。查阅 Werkzeug 文档,您的“应用程序初始化”事件在哪里,并且只从redisReader那里开始。

于 2016-03-18T23:46:17.610 回答
1

这里的解决方案是将我的线程放入 Flask 调用的函数中:

def setupRedis():
    pool = eventlet.GreenPool()
    pool.spawn(redisReader)
...
app.before_first_request(setupRedis)

这解决了 Werkzeug 重新启动时留下的额外线程。

于 2016-03-25T18:54:09.080 回答