1

在 node.js 中,我有一个读取流,我希望重新格式化并写入数据库。由于读取流快而写入慢,node.js 队列可能会随着写入队列的增加而不堪重负(假设流是 gb 的数据)。如何强制读取等待代码的写入部分,以便在没有阻塞的情况下不会发生这种情况?

var request = http.get({
      host: 'api.geonames.org',
      port: 80,
      path: '/children?' + qs.stringify({
      geonameId: geonameId,
      username: "demo"
   })
}).on('response', function(response) {
   response.setEncoding('utf8');
   var xml = new XmlStream(response, 'utf8');

   xml.on('endElement: geoname ', function(input) {  
      console.log('geoname');
      var output = new Object();
      output.Name = input.name;
      output.lat = input.lat;
      output.lng = input.lng;
      output._key = input.geonameId;
      data.db.document.create(output, data.doc, function(callback){    
         //this is really slow.
      }
      // i do not want to return from here and receive more data until the 'create' above has completed
   });  
});
4

2 回答 2

3

我昨晚刚遇到这个问题,在我的黑客马拉松引发的睡眠剥夺状态下,我是这样解决的:

每当我发送作业进行处理时,我都会增加一个计数器,并在操作完成时减少计数器。为了防止出站流量压倒其他服务,当有一定数量的待处理出站请求时,我会暂停流。代码与以下非常相似。

var instream = fs.createReadStream('./combined.csv');
var outstream = new stream;
var inProcess = 0;
var paused = false;
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
    inProcess++;
    if(inProcess > 100) {
        console.log('pausing input to clear queue');
        rl.pause();
        paused = true;
    }

    someService.doSomethingSlow(line, function() {
        inProcess--;
        if(paused && inProcess < 10) {
            console.log('resuming stream');
            paused = false;
            rl.resume();
        }

        if (err) throw err;
    });
});

rl.on('end', function() {
    rl.close();
});

不是最优雅的解决方案,但它有效,让我能够处理数百万行以上的行,而不会耗尽内存或限制其他服务。

于 2013-09-08T20:45:10.100 回答
0

我的解决方案只是扩展了一个空stream.Writable的,并且与@Timothy 的基本相同,但使用事件并且不依赖于 Streams1.pause().resume()(无论如何,这似乎对我的数据管道没有任何影响)。

var stream = require("stream");

var liveRequests = 0;
var maxLiveRequests = 100;
var streamPaused = false;

var requestClient = new stream.Writable();

function requestCompleted(){
    liveRequests--;
    if(streamPaused && liveRequests < maxLiveRequests){
        streamPaused = false;
        requestClient.emit("resumeStream");
    }
}

requestClient._write = function (data, enc, next){
    makeRequest(data, requestCompleted);
    liveRequests++;

    if(liveRequests >= maxLiveRequests){
        streamPaused = true;
        requestClient.once("resumeStream", function resume(){
            next();
        });
    }
    else {
        next();
    }
};

计数器 ,liveRequests,跟踪并发请求的数量,并在每次调用时递增 makeRequest(),在完成时(即,当requestCompleted())被调用时递减。如果刚刚发出请求并liveRequests超过maxLiveRequests,我们使用 暂停流streamPaused。如果请求完成,流被暂停,liveRequests现在小于maxLiveRequests,我们可以恢复流。由于随后的数据项_write()在其next()回调被调用时被读取,我们可以简单地在我们的自定义"resumeStream"事件上使用一个事件侦听器来延迟后者,它模仿暂停/恢复。现在,简单地说readStream.pipe(requestClient)


编辑:我将这个解决方案与输入数据的自动批处理一起抽象在一个中。

于 2014-11-11T18:47:43.010 回答