我必须定期下载/解析一堆 Json 数据,大约 1000~1.000.000 行。
每个请求的块限制为 5000。所以我想当时触发一堆请求,通过其自己的 Transfomer 流式传输每个输出以过滤掉键/值,然后写入将其输出写入的组合流数据库。
但是每次尝试它都不起作用,或者由于设置了许多事件侦听器而导致错误。如果我理解“最后一个管道”始终是链中的下一个参考,那么这似乎是正确的。
这是一些代码(更改了很多次,所以没有什么意义)。
问题是:将多个流连接到一个流是不好的做法吗?谷歌也没有展示很多关于它的内容。
谢谢!
brokerApi/getCandles.js
// The 'combined output' stream
let passStream = new Stream.PassThrough();
countChunks.forEach(chunk => {
let arr = [];
let leftOver = '';
let startFound = false;
let lastPiece = false;
let firstByte = false;
let now = Date.now();
let transformStream = this._client
// Returns PassThrough stream
.getCandles(instrument, chunk.from, chunk.until, timeFrame, chunk.count)
.on('error', err => console.error(err) || passStream.emit('error', err))
.on('end', () => {
if (++finished === countChunks.length)
passStream.end();
})
.pipe(passStream);
transformStream._transform = function(data, type, done) {
/** Treansform to typedArray **/
this.push(/** Taansformed value **/)
}
});
额外 - “消耗”流的其他文件(写入数据库)
数据层.js
brokerApi.getCandles(instrument, timeFrame, from, until, count)
.on('data', async (buf: NodeBuffer) => {
this._dataLayer.write(instrument, timeFrame, buf);
if (from && until) {
await this._mapper.update(instrument, timeFrame, from, until, buf.length / (10 * Float64Array.BYTES_PER_ELEMENT));
} else {
if (buf.length) {
if (!from)
from = buf.readDoubleLE(0);
if (!until) {
until = buf.readDoubleLE(buf.length - (10 * Float64Array.BYTES_PER_ELEMENT));
console.log('UNTIL TUNIL', until);
}
if (from && until)
await this._mapper.update(instrument, timeFrame, from, until, buf.length / (10 * Float64Array.BYTES_PER_ELEMENT));
}
}
})
.on('end', () => {
winston.info(`Cache: Fetching ${instrument} took ${Date.now() - now} ms`);
resolve()
})
.on('error', reject)