0

我必须定期下载/解析一堆 Json 数据,大约 1000~1.000.000 行。

每个请求的块限制为 5000。所以我想当时触发一堆请求,通过其自己的 Transfomer 流式传输每个输出以过滤掉键/值,然后写入将其输出写入的组合流数据库。

但是每次尝试它都不起作用,或者由于设置了许多事件侦听器而导致错误。如果我理解“最后一个管道”始终是链中的下一个参考,那么这似乎是正确的。

这是一些代码(更改了很多次,所以没有什么意义)。

问题是:将多个流连接到一个流是不好的做法吗?谷歌也没有展示很多关于它的内容。

谢谢!

brokerApi/getCandles.js

// The 'combined output' stream
let passStream = new Stream.PassThrough();

countChunks.forEach(chunk => {
    let arr = [];
    let leftOver = '';
    let startFound = false;
    let lastPiece = false;
    let firstByte = false;
    let now = Date.now();

    let transformStream = this._client

        // Returns PassThrough stream
        .getCandles(instrument, chunk.from, chunk.until, timeFrame, chunk.count)
        .on('error', err => console.error(err) || passStream.emit('error', err))
        .on('end', () => {
            if (++finished === countChunks.length)
                passStream.end();
        })
        .pipe(passStream);

    transformStream._transform = function(data, type, done) {
        /** Treansform to typedArray **/

        this.push(/** Taansformed value **/)
    }
});

额外 - “消耗”流的其他文件(写入数据库)

数据层.js

    brokerApi.getCandles(instrument, timeFrame, from, until, count)
            .on('data', async (buf: NodeBuffer) => {
                this._dataLayer.write(instrument, timeFrame, buf);

                if (from && until) {
                    await this._mapper.update(instrument, timeFrame, from, until, buf.length / (10 * Float64Array.BYTES_PER_ELEMENT));
                } else {
                    if (buf.length) {
                        if (!from)
                            from = buf.readDoubleLE(0);

                        if (!until) {
                            until = buf.readDoubleLE(buf.length - (10 * Float64Array.BYTES_PER_ELEMENT));
                            console.log('UNTIL TUNIL', until);
                        }

                        if (from && until)
                            await this._mapper.update(instrument, timeFrame, from, until, buf.length / (10 * Float64Array.BYTES_PER_ELEMENT));
                    }
                }

            })
            .on('end', () => {
                winston.info(`Cache: Fetching ${instrument} took ${Date.now() - now} ms`);
                resolve()
            })
            .on('error', reject)
4

1 回答 1

1

查看来自highlandjs的流助手,例如(未经测试,伪代码):

function getCandle(candle) {...}

_(chunks).map(getCandle).parallel(5000).pipe(...)
于 2017-06-09T21:36:25.457 回答