我阅读了您对您的问题的更新,并意识到我对您的问题留下的评论完全不合时宜。由于您使用的是流,因此您不想等待所有数据以避免内存耗尽。我应该在一开始就注意到这一点。
让我给你一些例子来说明我的道歉。我希望这有助于理解如何使用流。
为了让示例更真实,让我们像这样模拟从远程服务器获取 json node-fetch
。 node-fetch
返回的实例ReadableStream
也是asyncIterable
. 我们可以通过将异步生成器函数传递给stream.Readable.from()
如下来轻松创建它。
的定义fetch()
async function* asyncGenerator (chunks) {
let counter = 1;
for (const chunk of chunks) {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`==== chunk ${counter++} transmitted =====================`);
yield chunk;
}
}
const stream = require('stream');
// simulates node-fetch
async function fetch (json) {
const asyncIterable = asyncGenerator(json);
// let the client wait for 0.5 sec.
await new Promise(resolve => setTimeout(resolve, 500));
return new Promise(resolve => {
// returns the response object
resolve({ body: stream.Readable.from(asyncIterable) });
});
}
fetch()
获取响应对象需要 0.5 秒。它返回Promise
解析为body
提供ReadableStream
. 这个可读流每秒不断地向下游发送 json 数据块,如asyncGenerator()
.
我们的fetch()
函数将一个分块的 json 数组作为参数而不是 URL。让我们使用您提供的那个,但我们在稍有不同的点将其拆分,因此在收到第二个块后,我们得到了两个完整的对象。
const chunkedJson = [
// chunk 1
`[
{
"name": "John Doe",
"occupation": "gardener",
"born": "1992-03-02"
}
,
{
"name": "Brian Flem`,
// chunk 2
`ming",
"occupation": "teacher",
"born": "1967-11-22"
}
,
{
"name": "Lucy Black",
"occupation": "accountant",
"born": "1995-04-07"
}`,
// chunk 3
`,
{
"name": "William Bean",
"occupation": "pilot",
"born": "1977`,
// chunk 4
`-10-31"
}
]`
];
现在,有了这些数据,您可以确认fetch()
如下工作方式。
示例 1:测试fetch()
async function example1 () {
const response = await fetch(chunkedJson);
for await (const chunk of response.body) {
console.log(chunk);
}
}
example1();
console.log("==== Example 1 Started ==============");
示例 1 的输出。
==== Example 1 Started ==============
==== chunk 1 transmitted =====================
[
{
"name": "John Doe",
"occupation": "gardener",
"born": "1992-03-02"
}
,
{
"name": "Brian Flem
==== chunk 2 transmitted =====================
ming",
"occupation": "teacher",
"born": "1967-11-22"
}
,
{
"name": "Lucy Black",
"occupation": "accountant",
"born": "1995-04-07"
}
==== chunk 3 transmitted =====================
,
{
"name": "William Bean",
"occupation": "pilot",
"born": "1977
==== chunk 4 transmitted =====================
-10-31"
}
]
现在,让我们处理这个 json 数据的每个元素,而不是等待整个数据到达。
StraemArray
是stream.Transform的子类。所以它同时具有 和 的ReadableStream
接口WritableStream
。如果流实例与您连接,pipe()
则不必担心背压,因此我们通过管道传输两个流,即。和实例一起ReadableStream
获得,如下面的示例 2 所示。fetch()
StreamArray
response.body.pipe(StreamArray.withParser())
为方法链接返回自身的实例,因此该变量pipe(StreamArray.withParser())
现在保存对转换流的引用,该转换流也是一个可读流。我们可以将事件侦听器附加到它以使用转换后的数据。StreamArray
pipeline
StreamArray
data
当从可读源解析单个对象时发出事件。因此pipiline.on('data', callback)
,无需等待整个 json 数据即可逐块处理。
当事件监听器注册到data
事件时pipiline.on('data', callback)
,流开始流动。
由于我们模拟的是异步取数据,所以你可以!!!! MAIN THREAD !!!!
在数据传输的中间看到控制台中的 。您可以确认主线程在等待解析数据时没有被阻塞。
示例 2:测试stream-json
在每个数组元素到达时对其进行处理
const StreamArray = require('stream-json/streamers/StreamArray');
async function example2 () {
const response = await fetch(chunkedJson);
const pipeline = response.body.pipe(StreamArray.withParser());
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
pipeline.on('data', ({ key, value }) => {
console.log("====== stream-json StreamArray() RESULT ========");
console.log(value); // do your data processing here
}).on('close', () => {
clearInterval(timer); // stop the main thread console.log
});
}
example2();
console.log("==== Example 2 Started ==============");
示例 2 的输出。
==== Example 2 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== stream-json StreamArray() RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
由于所有流都是实例,EventEmitter
因此您可以简单地将回调附加到data
事件以使用示例 2 中的最终数据。但是,由于处理背压,因此最好使用pipe()
最终数据消耗。pipe()
当下游的数据消耗比上游的数据馈送慢时,就会出现背压问题。例如,当您的数据处理需要时间时,您可能希望异步处理每个块。如果处理下一个块在前一个块之前完成,则下一个块在第一个块之前被推送到下游。如果下游在处理下一个块之前依赖于第一个块,则会导致麻烦。
当您使用事件监听器时,您必须手动控制暂停和恢复以避免背压(请参阅此示例)。但是,如果您将流与pipe()
背压问题连接起来,则会在内部进行处理。这意味着当下游比上游慢时,pipe()
将自动暂停向下游进料。
因此,让我们创建自己WritableStream
的以连接到StreamArray
with pipe()
。在我们的例子中,我们从上游(即。StreamArray
)而不是字符串接收二进制数据,我们必须设置objectMode
为true
. 我们覆盖_write()
将在内部调用的函数write()
。您将所有数据处理逻辑放在这里并callback()
在完成时调用。上游不会提供下一个数据,直到当流连接时调用回调pipe()
。
为了模拟背压,我们将块 1 和 3 处理 1.5 秒,将块 0 和 4 处理为零秒。
示例 3:管道我们自己的流实例
class MyObjectConsumerStream extends stream.Writable {
constructor(options) {
super({ ...options, objectMode: true });
}
_write(chunk, encoding, callback) {
const { key, value } = chunk; // receive from StreamArray of stream-json
console.log("===== started to processing the chunk ........... ");
setTimeout(() => {
console.log("====== Example 3 RESULT ========");
console.log(value); // do your data processing here
callback(); // pipe() will pause the upstream until callback is called
}, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
}
}
//--- Example 3: We write our own WritableStream to consume chunked data ------
async function example3 () {
const response = await fetch(chunkedJson);
response.body.pipe(StreamArray.withParser()).pipe(new MyObjectConsumerStream()).on('finish', () => {
clearInterval(timer); // stop the main thread console.log
});
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
}
example3();
console.log("==== Example 3 Started ==============");
示例 3 的输出。
==== Example 3 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ...........
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
==== chunk 4 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
您可以确认接收到的数据是有序的。您还可以看到第二个块的传输在处理第一个对象时开始,因为我们将其设置为 1.5 秒。现在,让我们使用事件监听器做同样的事情,如下所示。
示例 4:简单回调的背压问题
async function example4 () {
const response = await fetch(chunkedJson);
const pipeline = response.body.pipe(StreamArray.withParser());
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
pipeline.on('data', ({ key, value }) => {
console.log("===== started to processing the chunk ........... ");
setTimeout(() => {
console.log(`====== Example 4 RESULT ========`);
console.log(value); // do your data processing here
}, key % 2 === 0 ? 1500 : 0); // for second and thrid chunk it processes 0 sec!
}).on('close', () => {
clearInterval(timer); // stop the main thread console.log
});
}
example4();
console.log("==== Example 4 Started ==============");
示例 4 的输出。
==== Example 4 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
===== started to processing the chunk ...........
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== Example 4 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
===== started to processing the chunk ...........
====== Example 4 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
现在,我们看到第二个元素“Brian”在“John”之前到达。如果块 1 和 3 的处理时间增加到 3 秒,则最后一个元素“William”也会在第三个元素“Lucy”之前到达。
因此,pipe()
当数据到达的顺序很重要时,使用而不是事件侦听器来消费数据是一个很好的做法。
您可能想知道为什么API 文档中的示例代码使用它们自己的chain()
函数来创建管道。它是 Node.js 流式编程中错误处理的推荐设计模式。如果错误在管道的下游抛出,它不会将错误传播到上游。因此,您必须按如下方式在管道中的每个流上附加回调(这里我们假设有三个流a
, b
, c
)。
a.on('error', callbackForA)
.pipe(b).on('error', callbackForB)
.pipe(c).on('error', callbackForC)
与可以简单添加到链尾的 Promise 链相比,它看起来很麻烦.catch()
。即使我们如上所述设置了所有错误处理程序,但这仍然不够。
当下游抛出错误时,错误导致的流将从管道中分离出来unpipe()
,但是,上游不会自动销毁。这是因为有可能将多个流连接到上游以分支流线。所以当你使用pipe()
.
为了解决这些问题,社区提供了管道构建库。我认为chain()
from流链就是其中之一。从 Node 版本 10 开始,为此功能添加了stream.pipeline 。我们可以使用这个官方的管道构造函数,因为其中的所有流stream-json
都是常规流实例的子类。
在展示stream.pipiline
让我们修改MyObjectConsumerStream
类以在处理第二个对象时抛出错误的用法之前。
引发错误的自定义流
class MyErrorStream extends MyObjectConsumerStream {
_write(chunk, encoding, callback) {
const { key, value } = chunk; // receive from StreamArray of stream-json
console.log("===== started to processing the chunk ........... ");
if (key === 2)
throw new Error("Error in key 2");
setTimeout(() => {
console.log("====== Example 5 RESULT ========");
console.log(value); // do your data processing here
callback(); // pipe() will pause the upstream until callback is called
}, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
};
}
stream.pipeline
按顺序接收多个流以及最后的错误处理程序。错误处理程序接收抛出错误的实例Error
,并null
在成功完成时接收。
示例 5:使用stream.pipeline
async function example5 () {
const response = await fetch(chunkedJson);
const myErrorHandler = (timerRef) => (error) => {
if (error)
console.log("Error in the pipiline", error.message);
else
console.log("Finished Example 5 successfully");
clearInterval(timerRef); // stop the main thread console.log
}
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
stream.pipeline(
response.body,
StreamArray.withParser(),
new MyErrorStream(),
myErrorHandler(timer)
);
console.log("==== Example 5 Started ==============");
}
example5();
示例 5 的输出
==== Example 5 Started ==============
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 5 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ...........
====== Example 5 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ...........
/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211
throw new Error("Error in key 2");
^
Error: Error in key 2
at MyErrorStream._write (/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211:13)
at doWrite (internal/streams/writable.js:377:12)
at clearBuffer (internal/streams/writable.js:529:7)
at onwrite (internal/streams/writable.js:430:7)
at Timeout._onTimeout (/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:215:7)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7)
抛出错误时,stream.pipeline()
调用stream.destroy(error)
所有未正确关闭或完成的流。所以我们不必担心内存泄漏。