我想使用 NodeJS 在服务器集群中通过 SSH 抓取数据。
远程脚本输出 JSON,然后将其解析并拆分为对象流。
我现在的问题是,我使用的面向回调的库(SSH2、MySQL)导致了一个回调模式,我发现它很难与 Readable API 规范匹配。_read(size)
当要推送的数据在一堆回调后面时如何实现?
我当前的实现利用了Streams
也是EventEmitters
. 我在构建 Stream 实例时开始填充我的数据。当我所有的回调都完成后,我会发出一个事件。然后我监听自定义事件,然后我才开始将数据向下推送到管道链中。
// Calling code
var stream = new CrawlerStream(argsForTheStream);
stream.on('queue_completed', function() {
stream
.pipe(logger)
.pipe(dbWriter)
.on('end', function() {
// Close db connection etc...
});
});
一个模拟CrawlerStream
将是
// Mock of the Readable stream implementation
function CrawlerStream(args) {
// boilerplate
// array holding the data to push
this.data = [];
// semi-colon separated string of commands
var cmdQueue = getCommandQueue();
var self = this;
db.query(sql, function(err, sitesToCrawl, fields) {
var servers = groupSitesByServer(sitesToCrawl);
for (var s in servers) {
sshConnect(getRemoteServer(s), function(err, conn) {
sshExec({
ssh: conn,
cmd: cmdQueue
}, function(err, stdout, stderr) {
// Stdout is parsed as JSON
// Finally I can populate self.data!
// Check if all servers are done
// If I'm the last callback to execute
self.data.push(null);
self.emit('queue_completed');
})
});
}
});
}
util.inherits(CrawlerStream, Readable);
CrawlerStream.prototype._read = function(size) {
while (this.data.length) {
this.push(this.data.shift());
}
}
我不确定这是否是实现此目的的惯用方式,并希望得到您的建议。
请在您的回答中注意,我想保留使用回调(无承诺)的 vanilla NodeJS 风格,并且我坚持使用 ES5。
谢谢你的时间!