1

我对 Node.js 库有点陌生,我正在尝试弄清楚如何在 HTTP 响应流上使用异步迭代。我的总体目标是读取大型响应流并在块到达时对其进行处理,目前通过生成器函数。我无法将整个响应存储在内存中进行处理。

我正在使用该request库执行 HTTP 请求,如下所示。

const request = require("request");

// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
  for await (const chunk of stream) {
    yield chunk[0];
  }
}

async function doWork() {
  var response = request.get("https://pastebin.com/raw/x4Nn0Tby");
  for await (c of getChunks(response)) {
    console.log(c);
  }
}

当我运行时doWork(),我收到一条错误消息,指出stream变量 ofgetChunks()不是异步可迭代的。

TypeError:流不是异步可迭代的

这令人惊讶,因为我认为所有可读流通常都是异步可迭代的,并且请求库在没有提供回调时返回一个流。当我替换request.get(...)fs.createReadStream(...)某个本地文件时,一切都按预期工作。

也许request图书馆不支持这一点。如果是这样,我需要做什么来通过异步迭代处理 HTTP 响应流?

使用 Node.js 11.13 和request2.88.0。

4

4 回答 4

2

我对requestandrequest-promise-native库做了一些更多的试验,并且认为在当前的实现下这是不可能的。结果流似乎根本不是异步可迭代的。此外,在处理流之前需要正确的实现await来返回响应(如@JBone's answer所建议的那样)。但是,如果您调用await request.get(...),您将检索响应的全部内容,这对于大型响应来说是不可取的。

const r = require("request");
const rpn = require("request-promise-native");

// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
  for await (const chunk of stream) {
    yield chunk[0];
  }
}

async function doWork() {
  const url = "https://pastebin.com/raw/x4Nn0Tby";
  const response = r.get(url);         // returns a non-async-iterable object.
  const response2 = await rp.get(url); // returns the contents of url

  for await (c of getChunks(response)) {  // yields response not async-iterable error.
    console.log(c);
  }
}

我对这个问题的解决方案是用库替换使用request和。这些库在功能上相似,但允许您指定请求应解析为流;正如预期的那样,流是异步可迭代的。request-promise-nativeaxiosaxios

const axios = require("axios");

async function doWork() {
  var response = await axios.request({
    method: "GET",
    url: "https://pastebin.com/raw/x4Nn0Tby",
    responseType: "stream",
  });

  for await (c of getChunks(response.data)) {  // async-iteration over response works as expected.
    console.log(c);
  }
}
于 2019-04-10T17:29:53.990 回答
1

简单的回答:不,它没有。您可能希望使用基于承诺的包装器request,例如request-promise,然后它也适用于async/ await

详细信息:请注意,它的创建者request弃用它,因此将停止使用。这意味着,迟早,您很可能需要切换到另一个解决方案,例如axiossuperagentneedle等等。

当然,您可以评估这些模块并找出最适合您需求的模块,但我个人的建议是从 开始axios,因为我过去在它方面有很好的经验,但是,YMMV。

于 2019-04-10T18:42:47.207 回答
0

似乎您将不得不使用其他替代方案,就像他们在request模块文档中提到的那样,您可以在此处找到 https://www.npmjs.com/package/request

request supports both streaming and callback interfaces natively. If you'd like 
request to return a Promise instead, you can use an alternative interface wrapper for 
request. These wrappers can be useful if you prefer to work with Promises, or if 
you'd like to use async/await in ES2017.

Several alternative interfaces are provided by the request team, including:

request-promise (uses Bluebird Promises)
request-promise-native (uses native Promises)
request-promise-any (uses any-promise Promises)`

我的回答基于以下问题:

我认为您可以创建async await执行此操作的自定义方法。

async function doMyWork() {
try {
 const response = await myOwnRequest(url); 
 } catch (e) {
   console.log ('the error', e);
 }  
}

function myOwnRequest(url) {
  return new Promise(function (resolve, reject) {
   const resp = request.get(url);
   if(resp) {
    resolve();
   } else {
     reject();
   }
});
}
于 2019-04-09T13:09:58.040 回答
0

在 axios 0.19.0 上使用上述答案中的示例代码时,axios 的流选项对我不起作用。可能是椅子和键盘之间的问题,但无论如何......这是使用request.

我最终将请求流调整为异步生成器(当然中间有一个缓冲区)。这允许“流”类型的接口,其中数据的读取和写入可以交错......它不保证低内存消耗。尽可能快地请求管道(“推送”)到我们的 Writable,并且没有办法让我们暂停它或将其翻转为“拉”类型的接口(据我所知)。因此,如果我们从缓冲区中读取数据的速度比写入数据的速度慢:缓冲区会变得非常大,内存使用率也会很高。

因此,如果降低内存使用率并从 http 源解析大文件是至关重要的......那么可能会在“流式传输”时对缓冲区大小进行一些监控/报告,以查看您使用的代码是比流更快还是更慢所以你知道缓冲区会变大还是变小。当然,如果您使用非常慢的 http 服务器进行测试……那么所有的赌注都没有了。

这可以通过设置固定的缓冲区大小并_write阻塞直到发生更多读取(在缓冲区中腾出空间)来解决......即请求必须等待将更多数据写入管道。但是请求可能会在内部缓冲......所以如果数据在请求结束时堆积起来,这将无助于内存消耗。将不得不检查。

示例代码:

const request = require('request'),
    Writable = require('stream').Writable,
    EventEmitter = require('events');

module.exports = function (url, MAX_BYTES=1024) {
    var response = new ResponseBuffer(MAX_BYTES);

    request
        .get(url)
        .on('error', function(err) { throw err; })
        .pipe(response)
        .on('error', function(err) { throw err; });

    return response.reader();
};

class ResponseBuffer extends Writable {
    constructor (MAX_BYTES=1024) {
        super();
        this.buffer = '';
        this.open = true;
        this.done = null;  // callback to call when done reading.
        this.MAX_BYTES = MAX_BYTES;
        this.events = new EventEmitter();
    }
    _write(chunk, enc, next) {
        this.buffer += chunk;
        this.events.emit('data');
        next();
    }
    _final(done) {
        this.open = false; // signal to reader to return after buffer empty.
        return done();
    }
    async * reader () {
        while (true) {
            if (this.buffer.length == 0) {
                // buffer empty and Writable !open. return.
                if (!this.open) { return; }
                else { // buffer empty.  wait for data.
                    await new Promise(resolve => this.events.once('data', resolve));
                }
            }
            let read_bytes = this.buffer.length < this.MAX_BYTES ? this.buffer.length : this.MAX_BYTES;
            yield this.buffer.slice(0, read_bytes);
            this.buffer = this.buffer.slice(read_bytes);
        }
    }
}

然后像这样使用它:


const httpModule = require('./path/to/above/module');
var httpGen = httpModule('https://www.google.com'),
    chunk;
for await (chunk of httpGen) {
    // do something with chunk.
}

另一种方法(如果您特别关注内存使用情况)是只下载到磁盘(流式传输到文件写入器),然后从磁盘增量读取(您可以 async iter a fs.createReadStream(...)

于 2019-08-27T07:26:48.117 回答