2

我需要读取包含数千行的日志文件并将每一行写入 Mongo 数据库。我正在使用节点流读取文件。我正在使用“split”npm 包将文件拆分为“行”。由于网络考虑,MongoDB 写入将比日志文件读取花费更长的时间。

我的核心代码如下所示:

var readableStream = fs.createReadStream(filename);

            readableStream
                .pipe(split()) // This splits the data into 'lines'
                .on('data', function (chunk) {

                    chunkCount++;
                    slowAsyncFunctionToWriteLogEntryToDatabase(chunk); // This will take ages

                })
                .on('end', function () {
                    // resolve the promise which bounds this process
                    defer.resolve({v:3,chunkCount: chunkCount})

                });

我是否需要担心 MongoDB 系统会受到排队写入次数的影响?大概节点管道背压机制不会知道大量的数据库写入正在排队?有什么方法可以“减慢”可读流,以便它在从日志文件中读取下一行之前等待每个 MongoDB 插入完成?我是在不必要地担心吗?

4

2 回答 2

2

自从工作以来pause()resume()似乎有一些问题。我将编写另一个选项,即使用转换流。

var Transform = require('stream').Transform;

var myTransform = new Transform({
   transform(chunk, encoding, cb) {
      chunkCount++;

      syncFunctionToWriteLogEntryWithCallback( chunk, function() {
         cb();
      } );
  },

  flush(cb) {
      chunkCount++;
      syncFunctionToWriteLogEntryWithCallback( chunk, function() {
         cb();
      } );
  }
});

readableStream
        .pipe( split() )
        .pipe( myTransform );

使用转换流允许您在处理完流时提供回调。

于 2016-11-24T17:32:23.990 回答
0

您可以在可读流中使用pause 方法在将块写入 mongodb 时停止流。

readableStream
            .pipe(split()) // This splits the data into 'lines'
            .on('data', function (chunk) {

                readableStream.pause()

                chunkCount++;

                syncFunctionToWriteLogEntryWithCallback( chunk, function() {
                    readableStream.resume();
                } );

            })
            .on('end', function () {
                // resolve the promise which bounds this process
                defer.resolve({v:3,chunkCount: chunkCount})

            });

在这种情况下,我认为 MongoDB 不会出现重大问题。

于 2016-11-24T12:55:23.103 回答