3

我正在尝试从flask-socketio 事件处理程序中收听rabbitmq 队列,以便可以向Web 应用程序发送实时通知。到目前为止我的设置:

服务器

import pika
import sys
from flask import Flask, request
from flask_socketio import SocketIO, emit, disconnect

app = Flask(__name__)
app.config['SECRET_KEY'] = 'not-so-secret'
socketio = SocketIO(app)

def is_authenticated():
  return True

def rabbit_callback(ch, method, properties, body):
    socketio.emit('connect', {'data': 'yes'})
    print "body: ", body

@socketio.on('connect')
def connected():
    emit('notification', {'data': 'Connected'})
    creds = pika.PlainCredentials(
        username="username",
        password="password")

    params = pika.ConnectionParameters(
        host="localhost",
        credentials=creds,
        virtual_host="/")

    connection = pika.BlockingConnection(params)

    # This is one channel inside the connection
    channel = connection.channel()

    # Declare the exchange we're going to use
    exchange_name = 'user'
    channel.exchange_declare(exchange=exchange_name,
                             type='topic')
    channel.queue_declare(queue='notifications')
    channel.queue_bind(exchange='user',
                       queue='notifications',
                       routing_key='#')

    channel.basic_consume(rabbit_callback,
                          queue='notifications',
                          no_ack=True)
    channel.start_consuming()


if __name__ == '__main__':
    socketio.run(app, port=8082)

浏览器

<script type="text/javascript" charset="utf-8">
    var socket = io.connect('http://' + document.domain + ':8082');
    socket.on('connect', function(resp) {
        console.log(resp);
    });
    socket.on('disconnect', function(resp) {
        console.log(resp);
    });
    socket.on('error', function(resp) {
        console.log(resp);
    });
    socket.on('notification', function(resp) {
        console.log(resp);
    });
</script>

如果我注释掉服务器代码底部的“channel.start_sumption()”行并加载浏览器页面,我会成功连接到 flask-socketio 并在我的控制台中看到 {data: "Connected"}。

当我取消注释该行时,我在控制台中看不到 {data: "Connected"}。然而,当我向通知队列发送消息时,rabbit_callback 函数会触发。我看到我的消息打印到服务器控制台,但发出调用似乎不起作用。服务器或浏览器中没有错误。非常感谢任何建议。

谢谢!

4

1 回答 1

0

我在使用 eventlet 时遇到了同样的问题,我刚刚解决了添加:

import eventlet
eventlet.monkey_patch()

,在我的源代码的开头。

无论如何,我的代码有点不同,并使用 start_background_task 方法:

import pika    
from threading import Lock
from flask import Flask, render_template, session, request, copy_current_request_context


from flask_socketio import SocketIO, emit, join_room, leave_room, \
    close_room, rooms, disconnect

app = Flask(__name__, static_url_path='/static')
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()

@socketio.on('connect', namespace='/test')
def test_connect():
    global thread
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(target=get_messages)

    emit('my_response', {'data': 'Connected', 'count': 0})
    print('connected')

def get_messages():
    channel = connect_rabbitmq()
    channel.start_consuming()

def connect_rabbitmq():
    cred = pika.credentials.PlainCredentials('username', 'password')
    conn_param = pika.ConnectionParameters(host='yourhostname',
                                           credentials=cred)
    connection = pika.BlockingConnection(conn_param)
    channel = connection.channel()

    channel.exchange_declare(exchange='ncs', exchange_type='fanout')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='myexchangename', queue=queue_name)

    channel.basic_consume(callback, queue=queue_name, no_ack=True)
    return channel

希望这可以帮助...

于 2017-09-14T10:04:29.840 回答