0

我有一个 Readable 生成一个无限序列:

var stream = require('stream');
var readable = new stream.Readable({
  read: function (size) {
    for (;;) {
      var s = ' ' + (++this.count);
      if (!readable.push(s))
        break;
    }
  }
});
readable.count = 0;
readable.pipe(process.stdout);

正如预期的那样,它可以永远运行:

.... 4151609 4151610 4151611 4151612 ....

而且我还有一个 Writable 可以吃掉任何给它的东西:

var writable = new stream.Writable({
  write: function (chunk, encoding, next) {
    console.log('w: ' + (++this.count));
    next();
  }
});
writable.count = 0;
process.stdin.pipe(writable);

我用一个非常大的文件对其进行了测试,它也按预期工作:

$ cat very-large-file | node test.js
....
w: 306772
w: 306773
w: 306774
w: 306775
....

但是,当我通过管道readable传输到writable

readable.pipe(writable);

该程序运行了一段时间,然后崩溃并出现核心转储:

w: 118351
w: 118352
w: 118353
<--- Last few GCs --->

  215645 ms: Scavenge 1398.2 (1457.3) -> 1398.2 (1457.3) MB, 0.4 / 0 ms (+ 0.9 ms in 1 steps since last GC) [allocation failure] [incremental marking delaying mark-sweep].
  216771 ms: Mark-sweep 1398.2 (1457.3) -> 1387.4 (1457.3) MB, 1125.9 / 0 ms (+ 0.9 ms in 1 steps since start of marking, biggest step 0.9 ms) [last resort gc].
  217877 ms: Mark-sweep 1387.4 (1457.3) -> 1396.9 (1457.3) MB, 1105.9 / 0 ms [last resort gc].


<--- JS stacktrace --->

==== JS stack trace =========================================

Security context: 0x1b8ff3b4629 <JS Object>
    1: nextTick [node.js:~477] [pc=0x5e5d1d9bd7b] (this=0xf61bdf1c521 <a process with map 0x53539c10f99>,callback=0xf0666bf7411 <JS Function afterWrite (SharedFunctionInfo 0xf0666b1ae01)>)
    2: arguments adaptor frame: 5->1
    3: onwrite(aka onwrite) [_stream_writable.js:~314] [pc=0x5e5d1d83f5e] (this=0x1b8ff3041b9 <undefined>,stream=0xf0666bf7961 <a Writable with map 0x178127921271>,er=0x1b8...

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
Aborted (core dumped)

为什么?我做错什么了吗?

更新

ShanShan评论说console.log会减慢可写的速度,并且无法跟随可读的所有输入,从而导致内存不足崩溃。为了验证这个理论,我做了一个实验:

var stream = require('stream');
var readable = new stream.Readable({
  highWaterMark: 20,
  read: function (size) {
    for (;;) {
      var s = ' ' + (++this.count);
      if (!readable.push(s))
        break;
    }
    console.log('r: ' + this.count + ' - ' + this._readableState.buffer.length);
  }
});
readable.count = 0;
var writable = new stream.Writable({
  write: function (chunk, encoding, next) {
    console.log('w: ' + (++this.count));
    next();
  }
});
writable.count = 0;
readable.pipe(writable);

代码几乎和以前一样,除了:

  1. 我将 highWaterMark(缓冲区大小限制)设置为非常低的值;
  2. 我在可读文件中添加了一个 console.log。

输出如下:

r: 10 - 10
r: 11 - 11
w: 1
w: 2
r: 12 - 10
w: 3
r: 13 - 10
w: 4
w: 5
r: 14 - 9
w: 6
r: 15 - 9
w: 7
w: 8
r: 16 - 8
w: 9
r: 17 - 8
w: 10
r: 18 - 8
w: 11
r: 19 - 8
w: 12
....
r: 20564 - 5
w: 20560
r: 20565 - 5
w: 20561
r: 20566 - 5
w: 20562
r: 20567 - 5
w: 20563
r: 20568 - 5
w: 20564

程序仍然崩溃。显然 and 的执行readable几乎writable是严格交错的,而且缓冲区大小总是很小,所以我认为 ShanShan 的理论是不正确的。

4

0 回答 0