11

寻找一些代码示例来解决这个问题:-

想编写一些代码(Python 或 Javascript)作为 RabbitMQ 队列的订阅者,以便在接收到消息时通过 websocket 将消息广播到任何连接的客户端。

我查看了 Autobahn 和 node.js(使用“ amqp ”和“ ws ”),但无法让事情按需要工作。这是使用 node.js 的 javascript 中的服务器代码:-

var amqp = require('amqp');
var WebSocketServer = require('ws').Server

var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});

wss.on('connection',function(ws){

    ws.on('open', function() {
        console.log('connected');
        ws.send(Date.now().toString());
    });

    ws.on('message',function(message){
            console.log('Received: %s',message);
            ws.send(Date.now().toString());
    });
});

connection.on('ready', function(){
    connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
            console.log(' [*] Waiting for messages. To exit press CTRL+C')
            queue.subscribe(function(msg){
                    console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
                    payload = msg.data.toString('utf-8');
                    // HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
            });
    });
});

使用此代码,我可以成功订阅 Rabbit 中的队列并接收发送到队列的任何消息。同样,我可以将 websocket 客户端(例如浏览器)连接到服务器并发送/接收消息。但是......如何在指示的点将 Rabbit 队列消息的有效负载作为 websocket 消息发送(“现在如何通过 WEBSOCKETS 发送”)?我认为这与陷入错误的回调有关,或者它们需要以某种方式嵌套......?

或者,如果这可以在 Python 中更容易地完成(通过 Autobahn 和 pika),那就太好了。

谢谢 !

4

1 回答 1

10

实现系统的一种方法是将 python 与tornado结合使用。

这里是服务器:

    import tornado.ioloop
    import tornado.web
    import tornado.websocket
    import os
    import pika
    from threading import Thread


    clients = []

    def threaded_rmq():
        connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"));
        print 'Connected:localhost'
        channel = connection.channel()
        channel.queue_declare(queue="my_queue")
        print 'Consumer ready, on my_queue'
        channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) 
        channel.start_consuming()


    def consumer_callback(ch, method, properties, body):
            print " [x] Received %r" % (body,)
            for itm in clients:
                itm.write_message(body)

    class SocketHandler(tornado.websocket.WebSocketHandler):
        def open(self):
            print "WebSocket opened"
            clients.append(self)
        def on_message(self, message):
            self.write_message(u"You said: " + message)

        def on_close(self):
            print "WebSocket closed"
            clients.remove(self)


    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            print "get page"
            self.render("websocket.html")


application = tornado.web.Application([
    (r'/ws', SocketHandler),
    (r"/", MainHandler),
])

if __name__ == "__main__":
    thread = Thread(target = threaded_rmq)
    thread.start()

    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

这里是html页面:

<html>
<head>
    <script src="//code.jquery.com/jquery-1.11.0.min.js"></script>
    <script>

    $(document).ready(function() {
      var ws;
       if ('WebSocket' in window) {
           ws = new WebSocket('ws://localhost:8889/ws');
        }
        else if ('MozWebSocket' in window) {
            ws = new MozWebSocket('ws://localhost:8889/ws');
        }
        else {

              alert("<tr><td> your browser doesn't support web socket </td></tr>");

            return;
        }

      ws.onopen = function(evt) { alert("Connection open ...")};

      ws.onmessage = function(evt){
        alert(evt.data);
      };

      function closeConnect(){
          ws.close();
      }


  });
    </script>

</head>

<html>

因此,当您将消息发布到“my_queue”时,该消息将重定向到所有连接的网页。

我希望它有用

编辑**

在这里https://github.com/Gsantomaggio/rabbitmqexample 你可以找到完整的例子

于 2014-04-06T12:54:01.240 回答