在 axios 0.19.0 上使用上述答案中的示例代码时,axios 的流选项对我不起作用。可能是椅子和键盘之间的问题,但无论如何......这是使用request
.
我最终将请求流调整为异步生成器(当然中间有一个缓冲区)。这允许“流”类型的接口,其中数据的读取和写入可以交错......它不保证低内存消耗。尽可能快地请求管道(“推送”)到我们的 Writable,并且没有办法让我们暂停它或将其翻转为“拉”类型的接口(据我所知)。因此,如果我们从缓冲区中读取数据的速度比写入数据的速度慢:缓冲区会变得非常大,内存使用率也会很高。
因此,如果降低内存使用率并从 http 源解析大文件是至关重要的......那么可能会在“流式传输”时对缓冲区大小进行一些监控/报告,以查看您使用的代码是比流更快还是更慢所以你知道缓冲区会变大还是变小。当然,如果您使用非常慢的 http 服务器进行测试……那么所有的赌注都没有了。
这可以通过设置固定的缓冲区大小并_write
阻塞直到发生更多读取(在缓冲区中腾出空间)来解决......即请求必须等待将更多数据写入管道。但是请求可能会在内部缓冲......所以如果数据在请求结束时堆积起来,这将无助于内存消耗。将不得不检查。
示例代码:
const request = require('request'),
Writable = require('stream').Writable,
EventEmitter = require('events');
module.exports = function (url, MAX_BYTES=1024) {
var response = new ResponseBuffer(MAX_BYTES);
request
.get(url)
.on('error', function(err) { throw err; })
.pipe(response)
.on('error', function(err) { throw err; });
return response.reader();
};
class ResponseBuffer extends Writable {
constructor (MAX_BYTES=1024) {
super();
this.buffer = '';
this.open = true;
this.done = null; // callback to call when done reading.
this.MAX_BYTES = MAX_BYTES;
this.events = new EventEmitter();
}
_write(chunk, enc, next) {
this.buffer += chunk;
this.events.emit('data');
next();
}
_final(done) {
this.open = false; // signal to reader to return after buffer empty.
return done();
}
async * reader () {
while (true) {
if (this.buffer.length == 0) {
// buffer empty and Writable !open. return.
if (!this.open) { return; }
else { // buffer empty. wait for data.
await new Promise(resolve => this.events.once('data', resolve));
}
}
let read_bytes = this.buffer.length < this.MAX_BYTES ? this.buffer.length : this.MAX_BYTES;
yield this.buffer.slice(0, read_bytes);
this.buffer = this.buffer.slice(read_bytes);
}
}
}
然后像这样使用它:
const httpModule = require('./path/to/above/module');
var httpGen = httpModule('https://www.google.com'),
chunk;
for await (chunk of httpGen) {
// do something with chunk.
}
另一种方法(如果您特别关注内存使用情况)是只下载到磁盘(流式传输到文件写入器),然后从磁盘增量读取(您可以 async iter a fs.createReadStream(...)
)