第一个.js
let data = {}
const ingSelf = this
const prom = await new Promise((resolve, reject) => {
data = inputs.input[0].pipe(through2.obj(function (chunk, enc, next) {
const throughSelf = this
ingestionSelf.myFunction(node, { input: [chunk] }, inputData, request, internalCall).then((resp) => {
if (R.type(resp) === "String") {
resp = JSON.parse(resp)
}
throughSelf.push(resp[0])
resolve(resp)
next()
})
}))
})
if (prom) {
return data
}
第二个.js
data.on("data", (chunk) => {
if (R.includes(R.type(chunk), ["Object", "Array"])){
pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
} else {
pushToKafkaQueue(topicName, chunk, request)
}
})
在该流停止后获取多达 32 条记录的数据。实际记录为 5000。如果我编写如下代码,则将获得 5000 条记录。
data.on("data", (chunk) => {
data.pause();
if (R.includes(R.type(chunk), ["Object", "Array"])){
pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
} else {
pushToKafkaQueue(topicName, chunk, request)
}
setTimeout(() => {
data.resume();
}, 0);
})
但这种解决方案并不合适。对于每个记录/块暂停流并立即再次恢复它。有什么好的解决方案可以以适当的方式解决这个问题吗?