2

我正在使用through2Concurrent. 此响应以缓冲区的形式出现,并使用JSONStream. 然后将其通过管道传输到我的转换流中。然后,转换流函数发出 HTTP 请求,格式化响应并将其存储到 MongoDB 中。我们正在使用并发流,因为否则将花费不可接受的长时间来处理所有事情。

response Stream -> JSONStream.parse() -> Transform Stream

问题描述
初始响应流在解析后包含大约 18,000 个对象。finish但是,在处理所有 18,000 个对象之前,流终止并接收到一个事件。不会引发错误,但在流结束之前实际上只处理了大约 2,000 - 5,000 个对象。处理的确切数字各不相同。

以下是相关代码:

const analyticsTransformer = through2Concurrent.obj({
  maxConcurrency: 15
}, async (doc, enc, cb) => {
  // Make an http request. This is a relatively long request.
  const res = await apim.getAnalytics(doc);
  // Save response to mongo.
  await UsageData.save(res);
  cb();
});

// Kick off the streaming.
broker.getInstances()
  .pipe(JSONStream.parse('*')
  .pipe(analyticsTransformer)
  .on('finish', () => {
    // We reach this way too quickly before we have handled all 18,000 objects
  })
  .on('error', err => {
    // No errors are caught.
  })

我试过的

  • 等待“结束”事件:结果相同。未处理的对象和提前终止。
  • 使用through2(not through2Concurrent):在数千个对象通过后接收 ETIMEOUT。
  • 设置highWaterMark为 18,000:这是唯一有效的方法。如果我改变这个highWatermark值,我可以处理所有的对象,但这实际上只是一个问题的创可贴。我想知道为什么会这样,以及我可以做些什么来以一种稳健的方式解决我的流媒体问题。

设置highWaterMark外观如下:

const analyticsTransformer = through2Concurrent.obj({
  highWaterMark: 18,000,
  maxConcurrency: 15
}, async (doc, enc, cb) => {
  // ...
});

为什么改变highWaterMark价值有效?

我提前终止流的真正原因是什么?

我该如何解决?

提前感谢任何可以提供帮助的人!:)

4

0 回答 0