4

我需要帮助。

一天以来,我一直在尝试使用 node.js 和 socket.io 进行异步编程。我知道我需要一些流量控制,但我似乎不明白如何正确实施它。

我有一个 redis 数据存储,它的模块存储在一个集合中,比如说“moda”,“modb”这些模块的实例是“moda:instances”,而在“modb:instances”中,这些实例的属性存储在“moda:instancea”中' 和 'modb:instanceb' 作为哈希。

我正在尝试获取以下json:

"moda": {"instancea": {"property1": "value1", "property2", "value2"}}, "modb": {"instanceb": {"property1": "value1"}}

有人可以在正确的方向上给我一点推动吗?

这是我当前的代码:

var io = require('socket.io').listen(2000);
var redis = require('redis').createClient();
var http = require('http');
var async = require('async');
var step = require('step');

io.sockets.on('connection', function (socket) {

var notifications = require('redis').createClient();
notifications.subscribe("notification");
notifications.on("message", function (channel, message) {
  socket.send(message);
  console.log(channel + ':' + message);
});

socket.on('modules', function(params, callback) {
  var response = {};
  async.series([

  function (callback) {
    console.log('1>');
    redis.smembers('modules', function (err, modules) {
      async.forEachSeries(modules, function(module, moduleCallback) {
        response[module] = {}

        redis.smembers(module + ':instances', function(err, instances) {
          async.forEachSeries(instances, function(instance, instanceCallback) {
            response[module][instance] = {}
            console.log('2>' + module + ':' +instance);
            instanceCallback();
          });
          moduleCallback();
        });
      });
      callback();
    });
  },
  function (callback) {
    console.log('3');
    callback();
  }

], function() {
  console.log(JSON.stringify(response));
});


});

});

这段代码的输出是:

   info  - socket.io started
   debug - client authorized
   info  - handshake authorized JMMn1I8aiOMGCMPOhC11
   debug - setting request GET /socket.io/1/websocket/JMMn1I8aiOMGCMPOhC11
   debug - set heartbeat interval for client JMMn1I8aiOMGCMPOhC11
   debug - client authorized for 
   debug - websocket writing 1::
1>
3
{"moda":{}}
2>moda:instancea
2>moda:instanceb
2>modb:instancea
4

2 回答 2

0

问题在于 forEachSeries 需要一个额外的回调作为第三个参数(在整个处理完成时调用)。您不能只在 forEachSeries 之后添加一些代码,希望它一旦完成就会被调用。

这是您修改的代码:

var response = {};
async.series([

function (callback) {
  console.log('1>');
  redis.smembers('modules', function (err, modules) {
    async.forEachSeries(modules, function(module, moduleCallback) {
      response[module] = {}

      redis.smembers(module + ':instances', function(err, instances) {
        async.forEachSeries(instances, function(instance, instanceCallback) {
          response[module][instance] = {}
          console.log('2>' + module + ':' +instance);
          instanceCallback();
        }, moduleCallback );
      });
    }, callback );
  });
},
function (callback) {
  console.log('3');
  callback();
}],
function() {
   console.log(JSON.stringify(response));
});

请注意回调和 moduleCallback 如何用作第三个参数。输出是:

1>
2>moda:instancea
2>moda:instanceb
2>modb:instancea
3
{"moda":{"instancea":{},"instanceb":{}},"modb":{"instancea":{}}}

我想这是你所期望的。

补充说明:forEachSeries 将按顺序处理所有内容,下一个操作等待上一个操作完成。这将产生大量到 Redis 的往返。forEach 在这里应该更有效地利用流水线。

于 2012-09-29T08:09:50.230 回答
-1

还可以看看 Promise 概念,使用 Promise 配置流程更具可读性。

首先,您需要准备您正在使用的 redis 方法的承诺版本:

var promisify = require('deferred').promisify;
var RedisClient = require('redis').RedisClient;
RedisClient.prototype.psmembers = promisify(RedisClient.prototype.smembers);

然后,您可以将流程构建为:

console.log('1>');
redis.psmembers('modules').map(function (module) {
  response[module] = {};
  return redis.psmembers(module + ':instances').map(function (instance) {
    response[module][instance] = {};
    console.log('2>' + module + ':' +instance);
  });
}).end(function () {
  console.log('3');
  console.log(JSON.stringify(response));
});

Check: https://github.com/medikoo/deferred

于 2012-09-29T20:19:56.233 回答