我有一个 Readable 生成一个无限序列:
var stream = require('stream');
var readable = new stream.Readable({
read: function (size) {
for (;;) {
var s = ' ' + (++this.count);
if (!readable.push(s))
break;
}
}
});
readable.count = 0;
readable.pipe(process.stdout);
正如预期的那样,它可以永远运行:
.... 4151609 4151610 4151611 4151612 ....
而且我还有一个 Writable 可以吃掉任何给它的东西:
var writable = new stream.Writable({
write: function (chunk, encoding, next) {
console.log('w: ' + (++this.count));
next();
}
});
writable.count = 0;
process.stdin.pipe(writable);
我用一个非常大的文件对其进行了测试,它也按预期工作:
$ cat very-large-file | node test.js
....
w: 306772
w: 306773
w: 306774
w: 306775
....
但是,当我通过管道readable
传输到writable
:
readable.pipe(writable);
该程序运行了一段时间,然后崩溃并出现核心转储:
w: 118351
w: 118352
w: 118353
<--- Last few GCs --->
215645 ms: Scavenge 1398.2 (1457.3) -> 1398.2 (1457.3) MB, 0.4 / 0 ms (+ 0.9 ms in 1 steps since last GC) [allocation failure] [incremental marking delaying mark-sweep].
216771 ms: Mark-sweep 1398.2 (1457.3) -> 1387.4 (1457.3) MB, 1125.9 / 0 ms (+ 0.9 ms in 1 steps since start of marking, biggest step 0.9 ms) [last resort gc].
217877 ms: Mark-sweep 1387.4 (1457.3) -> 1396.9 (1457.3) MB, 1105.9 / 0 ms [last resort gc].
<--- JS stacktrace --->
==== JS stack trace =========================================
Security context: 0x1b8ff3b4629 <JS Object>
1: nextTick [node.js:~477] [pc=0x5e5d1d9bd7b] (this=0xf61bdf1c521 <a process with map 0x53539c10f99>,callback=0xf0666bf7411 <JS Function afterWrite (SharedFunctionInfo 0xf0666b1ae01)>)
2: arguments adaptor frame: 5->1
3: onwrite(aka onwrite) [_stream_writable.js:~314] [pc=0x5e5d1d83f5e] (this=0x1b8ff3041b9 <undefined>,stream=0xf0666bf7961 <a Writable with map 0x178127921271>,er=0x1b8...
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
Aborted (core dumped)
为什么?我做错什么了吗?
更新
ShanShan评论说console.log会减慢可写的速度,并且无法跟随可读的所有输入,从而导致内存不足崩溃。为了验证这个理论,我做了一个实验:
var stream = require('stream');
var readable = new stream.Readable({
highWaterMark: 20,
read: function (size) {
for (;;) {
var s = ' ' + (++this.count);
if (!readable.push(s))
break;
}
console.log('r: ' + this.count + ' - ' + this._readableState.buffer.length);
}
});
readable.count = 0;
var writable = new stream.Writable({
write: function (chunk, encoding, next) {
console.log('w: ' + (++this.count));
next();
}
});
writable.count = 0;
readable.pipe(writable);
代码几乎和以前一样,除了:
- 我将 highWaterMark(缓冲区大小限制)设置为非常低的值;
- 我在可读文件中添加了一个 console.log。
输出如下:
r: 10 - 10
r: 11 - 11
w: 1
w: 2
r: 12 - 10
w: 3
r: 13 - 10
w: 4
w: 5
r: 14 - 9
w: 6
r: 15 - 9
w: 7
w: 8
r: 16 - 8
w: 9
r: 17 - 8
w: 10
r: 18 - 8
w: 11
r: 19 - 8
w: 12
....
r: 20564 - 5
w: 20560
r: 20565 - 5
w: 20561
r: 20566 - 5
w: 20562
r: 20567 - 5
w: 20563
r: 20568 - 5
w: 20564
程序仍然崩溃。显然 and 的执行readable
几乎writable
是严格交错的,而且缓冲区大小总是很小,所以我认为 ShanShan 的理论是不正确的。