2

我有一个 angularjs 应用程序,在服务器端有 node 和 express。我也有 node-amqp 和 socket.io

我想实现以下行为

该应用程序有一个页面(路由,角度视图),显示带有实时数据的表格。数据使用 socket.io 和 amqp 实时更新,以从位于应用程序外部的 rabbitMQ 服务器流式传输数据。

当用户在浏览器上访问此页面/路由时

  1. 客户端发出一个套接字事件“订阅”</li>
  2. 服务器,在套接字事件“订阅”上,
    • 声明一个兔子队列
    • 将兔子队列绑定到交换器
    • 订阅来自兔子队列的消息/数据
    • 发出一个套接字事件“数据”,将数据发送回用户/客户端

当用户离开页面时,或者换句话说改变路线

  1. 客户端发出一个套接字事件“取消订阅”</li>
  2. 服务器,在套接字事件“取消订阅”上,
    • 从队列中取消订阅

我的问题是:如何确保 queue.subscribe 和 queue.unsubscribe 是同步的?如果用户执行快速的路由更改序列: visit/leave/visit/leave/visit/leave 订阅和取消订阅的顺序有时会颠倒,在新订阅完成之前,服务器会在之前的订阅第二次取消订阅。有什么建议吗?这是我尝试过的,但不起作用:

客户端:controller.js

.controller('WatchdogCtrl', function($scope, watchSocket) {

    var data = {}
    $scope.data = []

    var socket = watchSocket

    socket.emit('subscribe', { exchange: 'bus', key: 'mis.service-state' })
    socket.on('data', function(message) {
        // refreshing  data 
        data[message.payload.id] = message.payload;
        var new-values = [];
        angular.forEach(data, function(value, index) {
            this.push(value);
        }, new-values);

        $scope.data = new-values
        $scope.$apply()
    });

    $scope.$on('$destroy', function (event) {
        // unsubscribe from rabbit queue when leaving 
        socket.emit('unsubscribe')
    });
})

服务器端:server.js

// set up amqp listener
var amqp = require('amqp');
// create rabbitmq connection with amqp
var rabbitMQ = amqp.createConnection({url: "amqp://my:url"});
rabbitMQ.on('ready', function() {
    console.log('Connection to rabbitMQ is ready')
});

// Hook Socket.io into Express
var io = require('socket.io').listen(server);
io.set('log level', 2);
io.of('/watch').on('connection', function(socket) {
    var watchq;
    var defr;
    socket.on('subscribe', function(spec) {
        watchq = rabbitMQ.queue('watch-queue', function(queue) {
            console.log('declare rabbit queue: "' + queue.name +'"');
            console.log('bind queue '+ queue.name + ' to exch=' + spec.exchange + ', key=' + spec.key);

            queue.bind(spec.exchange, spec.key)
            defr = queue.subscribe(function(message, headers, deliveryInfo) {
                     socket.emit('data', {
                        key: deliveryInfo.routingKey,
                        payload: JSON.parse(message.data.toString('utf8'))
                     })
                   }).addCallback(function(ok) { 
                       var ctag = ok.consumerTag; 
                       console.log('subscribed to queue: ' + queue.name + ' ctag = ' + ctag)
                   });

        })
    })

    socket.on('unsubscribe', function() {
        //needs fix: this does not ensure subscribe/unsubscribe synchronization…..
        defr.addCallback(function(ok) {
            console.log('unsubscribe form queue:', watchq.name, ', ctag =', ok.consumerTag)
            watchq.unsubscribe(ok.consumerTag);
        })
    })

});

服务器 console.log 消息:(visit#3 和 leave#3 不同步)

declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.6418165327049792 //<-- visit#1
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.6418165327049792 //<--leave#1
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.455362161854282 //<-- visit#2
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.455362161854282 //<-- leave#2
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.455362161854282 //<-- leave#3
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.4509762797970325 //<-- visit#3
4

1 回答 1

3

我们的设置与您的非常相似。如果未使用,我们会创建一个具有过期时间的匿名独占队列。匿名队列获得代理为其生成的唯一名称。一旦客户端断开连接(一旦通道被拆除),独占队列就会被删除。队列的过期时间是 RabbitMQ 扩展,但我们使用的 amqplib 支持。我确信 node-amqp 也对此类扩展有某种支持。

还要为每个套接字创建一个通道(但重用相同的连接)。这给出了套接字和匿名队列之间的一对一映射。对该队列的任何绑定都等效于单个套接字的绑定。正因为如此,我们天生就知道哪个套接字应该得到什么消息,而不需要任何特殊的队列命名约定或检查路由键等。

关闭套接字时关闭 RabbitMQ 通道(同样,不是连接)。不需要特殊的取消订阅事件,尽管我们可能会在以后添加这样的事件。

这也意味着同一个浏览器可以有多个队列,如果它们打开多个选项卡而没有任何竞争条件。

于 2014-02-11T14:36:30.957 回答