2

我想使用 NodeJS 在服务器集群中通过 SSH 抓取数据。

远程脚本输出 JSON,然后将其解析并拆分为对象流

我现在的问题是,我使用的面向回调的库(SSH2、MySQL)导致了一个回调模式,我发现它很难与 Readable API 规范匹配。_read(size)当要推送的数据在一堆回调后面时如何实现?

我当前的实现利用了Streams也是EventEmitters. 我在构建 Stream 实例时开始填充我的数据。当我所有的回调都完成后,我会发出一个事件。然后我监听自定义事件,然后我才开始将数据向下推送到管道链中。

// Calling code

var stream = new CrawlerStream(argsForTheStream);

stream.on('queue_completed', function() {

  stream
    .pipe(logger)
    .pipe(dbWriter)
    .on('end', function() {
      // Close db connection etc...
    });

});

一个模拟CrawlerStream将是

// Mock of the Readable stream implementation
function CrawlerStream(args) {
  // boilerplate

  // array holding the data to push
  this.data = [];

  // semi-colon separated string of commands 
  var cmdQueue = getCommandQueue();
  var self = this;

  db.query(sql, function(err, sitesToCrawl, fields) {

    var servers = groupSitesByServer(sitesToCrawl);

    for (var s in servers) {

      sshConnect(getRemoteServer(s), function(err, conn) {
        
       sshExec({
          ssh: conn,
          cmd: cmdQueue
        }, function(err, stdout, stderr) {

          // Stdout is parsed as JSON 

          // Finally I can populate self.data!

          // Check if all servers are done

          // If I'm the last callback to execute
          self.data.push(null);
          self.emit('queue_completed');

        })
      });
    }

  });

}
util.inherits(CrawlerStream, Readable);

CrawlerStream.prototype._read = function(size) {
  
  while (this.data.length) {
    this.push(this.data.shift());
  }

}

我不确定这是否是实现此目的的惯用方式,并希望得到您的建议。

请在您的回答中注意,我想保留使用回调(无承诺)的 vanilla NodeJS 风格,并且我坚持使用 ES5。

谢谢你的时间!

4

0 回答 0