17

这个问题是如何真正实现可读流的读取方法。

我有一个可读流的实现:

import {Readable} from "stream";
this.readableStream = new Readable();

我收到此错误

events.js:136 抛出错误;// 未处理的“错误”事件 ^

错误 [ERR_STREAM_READ_NOT_IMPLEMENTED]:_read() 未在 Readable._read (_stream_readable.js:554:22) 处 Readable.read (_stream_readable.js:445:10) 在 resume_ (_stream_readable.js:825:12) 在 _combinedTickCallback (internal/process/next_tick.js:138:11) at process._tickCallback (internal/process/next_tick.js:180:9) at Function.Module.runMain (module.js:684:11) at startup (bootstrap_node. js:191:16) 在 bootstrap_node.js:613:3

错误发生的原因很明显,我们需要这样做:

  this.readableStream = new Readable({
      read(size) {
        return true;
      }
    });

我真的不明白如何实现 read 方法。

唯一有效的就是打电话

this.readableStream.push('some string or buffer');

如果我尝试做这样的事情:

   this.readableStream = new Readable({
          read(size) {
            this.push('foo');   // call push here!
            return true;
          }
     });

然后什么也没有发生 - 没有任何东西来自可读!

此外,这些文章说您不需要实现 read 方法:

https://github.com/substack/stream-handbook#creating-a-readable-stream

https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93

我的问题是- 为什么在 read 方法中调用 push 什么都不做?唯一对我有用的就是在别处调用 readable.push() 。

4

4 回答 4

16

为什么在 read 方法中调用 push 什么都不做?唯一对我有用的就是在别处调用 readable.push() 。

我认为这是因为您没有使用它,您需要将其通过管道传输到可写流(例如标准输出)或仅通过data事件使用它:

const { Readable } = require("stream");

let count = 0;
const readableStream = new Readable({
    read(size) {
        this.push('foo');
        if (count === 5) this.push(null);
        count++;
    }
});

// piping
readableStream.pipe(process.stdout)

// through the data event
readableStream.on('data', (chunk) => {
  console.log(chunk.toString());
});

它们都应该打印 5 次foo(尽管它们略有不同)。您应该使用哪一个取决于您要完成的任务。

此外,这些文章说您不需要实现 read 方法:

您可能不需要它,这应该可以工作:

const { Readable } = require("stream");

const readableStream = new Readable();

for (let i = 0; i <= 5; i++) {
    readableStream.push('foo');
}
readableStream.push(null);

readableStream.pipe(process.stdout)

在这种情况下,您无法通过data事件捕获它。另外,我想说这种方式不是很有用而且效率不高,我们只是一次将所有数据推送到流中(如果它很大,所有内容都将在内存中),然后使用它。

于 2018-03-16T17:21:03.090 回答
4

从文档:

可读的._read:

“当调用 readable._read() 时,如果资源中的数据可用,则实现应该开始使用 this.push(dataChunk) 方法将该数据推送到读取队列中。链接

可读的.push:

“ readable.push() 方法只能由 Readable 实现者调用,并且只能从 readable._read() 方法中调用。link

于 2019-03-10T08:50:33.010 回答
2

_read在 ReadableStream 初始化后实现该方法:

import {Readable} from "stream";
this.readableStream = new Readable();
this.readableStream.read = function () {};
于 2019-03-05T16:00:43.247 回答
1

readableStream 就像一个池:

  • .push(data),就像把水抽到水池里一样。
  • .pipe(destination),就像将水池连接到管道并将水抽到其他地方
  • _read(size) 作为抽水机运行并控制水流量和数据何时结束。

fs.createReadStream() 将创建读取流,其中 _read() 函数已自动实现以推送文件数据并在文件结束时结束。

当池连接到管道时,_read(size) 会自动触发。因此,如果您在没有连接到目的地的方式的情况下强制调用此函数,它将泵到?哪里?它会影响 _read() 内的机器状态(可能是光标移动到错误的位置,...)

read() 函数必须在 new Stream.Readable() 中创建。它实际上是对象内部的一个函数。它不是 readableStream.read(),并且实现 readableStream.read=function(size){...} 将不起作用。

理解实现的简单方法:

var Reader=new Object();
Reader.read=function(size){
    if (this.i==null){this.i=1;}else{this.i++;}
    this.push("abc");
    if (this.i>7){ this.push(null); }
}

const Stream = require('stream');
const renderStream = new Stream.Readable(Reader);

renderStream.pipe(process.stdout)

您可以使用它来重新渲染任何流数据以 POST 到其他服务器。使用 Axios POST 流数据:

require('axios')({
    method: 'POST',
    url: 'http://127.0.0.1:3000',
    headers: {'Content-Length': 1000000000000},
    data: renderStream
});
于 2021-10-09T15:56:15.487 回答