22

从数组和管道值创建可读流到可写流的最佳方法是什么?我已经看到使用 setInterval 的子堆栈示例,并且我可以使用 0 作为间隔值成功实现该示例,但是我正在迭代大量数据并且每次触发 gc 都会减慢速度。

// Working with the setInterval wrapper
var arr = [1, 5, 3, 6, 8, 9];

function createStream () {
    var t = new stream;
    t.readable = true;
    var times = 0;
    var iv = setInterval(function () {
        t.emit('data', arr[times]);
        if (++times === arr.length) {
            t.emit('end');
            clearInterval(iv);
        }
    }
}, 0);

// Create the writable stream s
// ....

createStream().pipe(s);

我想做的是在没有 setInterval 的情况下发出值。也许像这样使用异步模块:

async.forEachSeries(arr, function(item, cb) {
    t.emit('data', item);
    cb();
}, function(err) {
 if (err) {
     console.log(err);
 }
 t.emit('end');
});

在这种情况下,我迭代数组并发出数据,但从不管道任何值。我已经看过 shinout 的ArrayStream,但我认为它是在 v0.10 之前创建的,它的开销比我想要的要多。

4

5 回答 5

37

您可以通过创建可读流并将值推入其中来解决此问题。

流是一种痛苦,但直接使用它们通常比使用更容易。

要流式传输的字符串或缓冲区数组

如果您正在使用字符串或缓冲区数组,这将起作用:

'use strict'
const Stream = require('stream')
const readable = new Stream.Readable()

readable.pipe(process.stdout)

const items = ['a', 'b', 'c']
items.forEach(item => readable.push(item))

// no more data
readable.push(null)

笔记:

  • readable.pipe(process.stdout)做两件事:将流置于“流动”模式并设置 process.stdout 可写流以接收来自readable
  • Readable#push方法适用于可读流的创建者,而不是流消费者。
  • 您必须Readable#push(null)发出信号表明没有更多数据。

要流式传输的非字符串数组

要从既不是字符串也不是缓冲区的事物数组中创建流,您需要可读流和可写流都处于“对象模式”。在下面的示例中,我进行了以下更改:

  • 初始化可读流{objectMode: true}
  • 不是管道到process.stdout,而是管道到处于对象模式的简单可写流。

      'use strict'
      const Stream = require('stream')
    
      const readable = new Stream.Readable({objectMode: true})
    
      const writable = new Stream.Writable({objectMode: true})
      writable._write = (object, encoding, done) => {
        console.log(object)
    
        // ready to process the next chunk
        done()
      }
    
      readable.pipe(writable)
    
      const items = [1, 2, 3]
      items.forEach(item => readable.push(item))
    
      // end the stream
      readable.push(null)
    

性能说明

数据从何而来?如果它是流数据源,最好使用转换流来操作流,而不是转换为数组/从数组转换。

于 2016-02-27T17:03:14.667 回答
19

从 Node 12.3 开始,您可以stream.Readable.from(iterable, [options])改用。

const { Readable } = require('stream');
const readableStream = Readable.from(arr);
于 2020-02-28T21:32:05.770 回答
14

tl;dr;

This is a LIFO solution. Array.prototype.pop() has similar behavior to shift but applied to the last element in an array.

const items = [1,2,3]
const stream = new Readable({
  objectMode: true,
  read() {
    const item = items.pop()
    if (!item) {
      this.push(null);
      return;
    }
    this.push(item)
  },
})
于 2018-06-19T19:06:52.073 回答
2

为此,我最终使用了ArrayStream。它确实解决了 GC 被频繁触发的问题。我收到来自节点的递归 process.nextTick 的警告,因此将 ArrayStream 中的 nextTick 回调修改为 setImmediate 并修复了警告并且似乎运行良好。

于 2013-06-28T14:40:13.753 回答
2

这是一个老问题,但如果有人偶然发现这个问题,node-stream-array是 Node.js >= v0.10 的一个更简单、更优雅的实现

var streamify = require('stream-array'),
  os = require('os');

streamify(['1', '2', '3', os.EOL]).pipe(process.stdout);
于 2015-09-16T09:27:43.330 回答