2

I'm trying to pipe an input stream (created from a huge GeoJSON file) through JSONStream.parse() to break the stream into objects, then through event-stream.map() to allow me to transform the object, then through JSONStream.stringify() to create a string out of it, and finally to a writable output stream. As the process runs, I can see node's memory footprint continue to grow until it eventually exhausts heap. Here's the simplest script (test.js) that recreates the problem:

const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")

out = fs.createWriteStream("/dev/null")
process.stdin
    .pipe(js.parse("features.*"))
    .pipe(es.map( function(data, cb) { 
        cb(null, data);
        return;
    } ))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out)

A little bash script (barf.sh) that spews an endless stream of JSON into node's process.stdin will cause node's heap to gradually grow:

#!/bin/bash

echo '{"type":"FeatureCollection","features":['
while :
do
    echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done

by running it as so:

barf.sh | node test.js

There are a couple of curious ways to sidestep the issue:

  • Remove the fs.createWriteStream() and change the last pipe stage from ".pipe(out)" to ".pipe(process.stdout)" and then pipe node's stdout to /dev/null
  • Change the asynchronous es.map() to the synchronous es.mapSync()

Either one of the preceding two actions will allow the script to run forever, with node's memory footprint low and unchanging. I'm using node v6.3.1, event-stream v3.3.4, and JSONStream 1.1.4 on an eight core machine with 8GB of RAM running Ubuntu 16.04.

I hope someone can help me correct what I'm sure is an obvious error on my part.

4

1 回答 1

4

JSONStream 不是streams2流,因此不支持背压控制。(这里有关于streams2 的简要总结。)

这意味着数据将parsedata事件中从流中出来,并且流将继续将它们抽出,而不管消费流是否已为它们准备好。如果管道中某处的读取和写入速度之间存在一些差异,则会出现缓冲 - 这就是您所看到的。

您的barf.sh线束可以看到通过 注入的功能stdin。相反,如果您正在读取大量文件,则应该能够通过暂停文件的读取流来管理流。因此,如果您要在回调中插入一些pause/resume逻辑map,您应该能够让它处理一个庞大的文件;只是需要更长的时间。我会尝试这样的事情:

let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
    .pipe(js.parse("features.*"))
    .pipe(es.map(function(data, cb) {
        // This is just an example; a 10-millisecond wait per feature would be very slow.
        if (!in.isPaused()) {
            in.pause();
            global.setTimeout(function () { in.resume(); }, 10);
        }
        cb(null, data);
        return;
    }))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out);

顺便说一句,使用mapSync对我的计算机(旧且慢)几乎没有影响。但是,除非您有一些异步操作要在 中执行,否则map我会选择mapSync.

于 2016-08-18T01:39:13.173 回答