我正在使用through2Concurrent
. 此响应以缓冲区的形式出现,并使用JSONStream
. 然后将其通过管道传输到我的转换流中。然后,转换流函数发出 HTTP 请求,格式化响应并将其存储到 MongoDB 中。我们正在使用并发流,因为否则将花费不可接受的长时间来处理所有事情。
response Stream -> JSONStream.parse() -> Transform Stream
问题描述
初始响应流在解析后包含大约 18,000 个对象。finish
但是,在处理所有 18,000 个对象之前,流终止并接收到一个事件。不会引发错误,但在流结束之前实际上只处理了大约 2,000 - 5,000 个对象。处理的确切数字各不相同。
以下是相关代码:
const analyticsTransformer = through2Concurrent.obj({
maxConcurrency: 15
}, async (doc, enc, cb) => {
// Make an http request. This is a relatively long request.
const res = await apim.getAnalytics(doc);
// Save response to mongo.
await UsageData.save(res);
cb();
});
// Kick off the streaming.
broker.getInstances()
.pipe(JSONStream.parse('*')
.pipe(analyticsTransformer)
.on('finish', () => {
// We reach this way too quickly before we have handled all 18,000 objects
})
.on('error', err => {
// No errors are caught.
})
我试过的
- 等待“结束”事件:结果相同。未处理的对象和提前终止。
- 使用
through2
(notthrough2Concurrent
):在数千个对象通过后接收 ETIMEOUT。 - 设置
highWaterMark
为 18,000:这是唯一有效的方法。如果我改变这个highWatermark
值,我可以处理所有的对象,但这实际上只是一个问题的创可贴。我想知道为什么会这样,以及我可以做些什么来以一种稳健的方式解决我的流媒体问题。
设置highWaterMark
外观如下:
const analyticsTransformer = through2Concurrent.obj({
highWaterMark: 18,000,
maxConcurrency: 15
}, async (doc, enc, cb) => {
// ...
});
为什么改变highWaterMark
价值有效?
我提前终止流的真正原因是什么?
我该如何解决?
提前感谢任何可以提供帮助的人!:)