我正在构建一个从rabbitmq使用接收消息的服务pika。并使用 . 向客户端推送消息socket.io。
socket.io服务器和 pika 服务器都阻塞了主线程。
这对于celerywith flaskor也是一样的Django。
解决此问题并在同一上下文中运行它们的正确方法是什么?
谢谢你,谢伊
您可以使用该Pub/Sub模型,在另一个线程中启动消费过程,注册想要从队列中接收数据并将数据发送给订阅用户的用户。
import json
import pika
import gevent
from flask import Flask
from flask_sockets import Sockets
connection_url = 'localhost'
channel_queue = 'test'
class PubSubListener(threading.Thread):
def __init__(self, queue_name):
threading.Thread.__init__(self)
self.clients = []
self.queue_name = queue_name
connection = pika.BlockingConnection(pika.ConnectionParameters(connection_url))
self.channel = connection.channel()
self.channel.queue_declare(queue=self.queue_name)
threading.Thread(target=self.channel.basic_consume(queue=self.queue_name,
auto_ack=True,
on_message_callback=self._callback))
def run(self):
self.channel.start_consuming()
def publish(self, body):
self.channel.basic_publish(exchange='',
routing_key=self.queue_name,
body=body)
def subscribe(self, client):
self.clients.append(client)
def _callback(self, channel, method, properties, body):
time.sleep(0.001)
message = json.loads(body)
print(message)
self.send(message)
def send(self, data):
for client in self.clients:
try:
client.send(data)
except Exception:
self.clients.remove(client)
pslistener = PubSubListener(channel_queue)
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/echo')
def echo_socket(ws):
pslistener.subscribe(ws)
while not ws.closed:
gevent.sleep(0.1)
@app.route('/')
def hello():
return 'Hello World!'
if __name__ == "__main__":
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
pslistener.start()
print("Started")
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()