1

我必须从输入文件中抽取行,转换它们并将它们放入输出文件中。

由于输入文件很大,我通过 HighlandJS 流式传输它。

转换步骤包括 MySQL DB 中的异步查询(通过 node-mysql),我不知道如何在流中管理异步查询。我的不同尝试给出了错误或没有。

我的最后一次尝试是:

h(inputStream)
.split()
.through(JSONStream.parse())
.map(function (data) {
    h.wrapCallback(pool.query(data, function (err, rows) {
        return rows;
    }));
})
.pipe(outputStream);

关于如何做到这一点的任何提示?

谢谢。

4

1 回答 1

1

简短的回答:你的map转变必须返回一些东西。现在它什么也不返回。

长答案:

好的,为了这个答案的目的,我将稍微简化你的逻辑。假设我们想要这个。

input -> map to rows -> output

问题是映射是异步的,正如您指出的那样,map函数必须返回一些东西。在这种情况下,您只需为输入中的每个元素返回一个流。所以它看起来像这样。

// input -> map to a stream of streams of rows -> output
h(input).map(h.wrapCallback(pool.query)).pipe(output);

最后一个问题实际上是获取行而不是流。您可以通过使用flatMap转换来做到这一点,这会将流“扁平化”为“正常”流。

// input -> map to a stream of rows -> output
h(input).flatMap(h.wrapCallback(pool.query)).pipe(output);
于 2015-10-30T11:53:45.233 回答