1

第一个.js

let data = {}
  const ingSelf = this
  const prom = await new Promise((resolve, reject) => {
    data = inputs.input[0].pipe(through2.obj(function (chunk, enc, next) {
      const throughSelf = this
      ingestionSelf.myFunction(node, { input: [chunk] }, inputData, request, internalCall).then((resp) => {
        if (R.type(resp) === "String") {
          resp = JSON.parse(resp)
        }
        throughSelf.push(resp[0])
        resolve(resp)
        next()
      })
    }))
  })

  if (prom) {
    return data
  }

第二个.js

data.on("data", (chunk) => {
      if (R.includes(R.type(chunk), ["Object", "Array"])){
        pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
      } else {
        pushToKafkaQueue(topicName, chunk, request)
      }
    })

在该流停止后获取多达 32 条记录的数据。实际记录为 5000。如果我编写如下代码,则将获得 5000 条记录。

 data.on("data", (chunk) => {
      data.pause();
      if (R.includes(R.type(chunk), ["Object", "Array"])){
        pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
      } else {
        pushToKafkaQueue(topicName, chunk, request)
      }
      setTimeout(() => {
        data.resume();
      }, 0);
    })

但这种解决方案并不合适。对于每个记录/块暂停流并立即再次恢复它。有什么好的解决方案可以以适当的方式解决这个问题吗?

4

0 回答 0