0

我正在构建一个从rabbitmq使用接收消息的服务pika。并使用 . 向客户端推送消息socket.io

socket.io服务器和 pika 服务器都阻塞了主线程。
这对于celerywith flaskor也是一样的Django

解决此问题并在同一上下文中运行它们的正确方法是什么?

谢谢你,谢伊

4

1 回答 1

1

您可以使用该Pub/Sub模型,在另一个线程中启动消费过程,注册想要从队列中接收数据并将数据发送给订阅用户的用户。

import json
import pika
import gevent
from flask import Flask
from flask_sockets import Sockets

connection_url = 'localhost'
channel_queue = 'test'

class PubSubListener(threading.Thread):
    def __init__(self, queue_name):
        threading.Thread.__init__(self)

        self.clients = []
        self.queue_name = queue_name

        connection = pika.BlockingConnection(pika.ConnectionParameters(connection_url))
        self.channel = connection.channel()
        self.channel.queue_declare(queue=self.queue_name)

        threading.Thread(target=self.channel.basic_consume(queue=self.queue_name,
            auto_ack=True,
            on_message_callback=self._callback))

    def run(self):
        self.channel.start_consuming()

    def publish(self, body):
        self.channel.basic_publish(exchange='',
            routing_key=self.queue_name,
            body=body)

    def subscribe(self, client):
        self.clients.append(client)

    def _callback(self, channel, method, properties, body):
        time.sleep(0.001)
        message = json.loads(body)
        print(message)
        self.send(message)

    def send(self, data):
        for client in self.clients:
            try:
                client.send(data)
            except Exception:
                self.clients.remove(client)

pslistener = PubSubListener(channel_queue)

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/echo')
def echo_socket(ws):
    pslistener.subscribe(ws)

    while not ws.closed:
        gevent.sleep(0.1)

@app.route('/')
def hello():
    return 'Hello World!'


if __name__ == "__main__":
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler

    pslistener.start()

    print("Started")
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()
于 2021-10-13T06:43:35.113 回答