15

在 Node.js 中,我使用该fs.createWriteStream方法将数据附加到本地文件。在 Node 文档中,他们drain在使用时提到了该事件fs.createWriteStream,但我不明白。

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);

在上面的代码中,如何使用 drain 事件?下面的事件是否正确使用?

var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}
4

4 回答 4

32

drain事件用于可写流的内部缓冲区已被清空。

只有当内部缓冲区的大小超过其highWaterMark属性时才会发生这种情况,该属性是可写流的内部缓冲区中可以存储的最大数据字节数,直到它停止从数据源读取为止。

出现这种情况的原因可能是由于设置涉及从一个流中读取数据源的速度快于将其写入另一个资源的速度。例如,取两个流:

var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');

现在假设该文件read位于 SSD 上,可以以 500MB/s 的速度读取,而write位于只能以150MB/s. 写入流将无法跟上,并将开始将数据存储在内部缓冲区中。一旦缓冲区达到highWaterMark默认值 16KB,写入将开始返回false,并且流将在内部排队排空。一旦内部缓冲区的长度为 0,则drain触发事件。

这是排水管的工作原理:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

这些是作为writeOrBuffer功能一部分的排水的先决条件:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;

要查看如何使用drain事件,请以 Node.js 文档中的示例为例。

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

该函数的目标是向可写流写入 1,000,000 次。发生的情况是变量ok设置为 true,并且循环仅在ok为 true 时执行。对于每个循环迭代,将 的值ok设置为 的值stream.write(),如果需要 a 则返回 false drain。如果ok变为 false,则drain等待事件处理程序并在着火时恢复写入。


特别是关于您的代码,您不需要使用该drain事件,因为您在打开流后只编写一次。由于您尚未向流中写入任何内容,因此内部缓冲区是空的,您必须以块的形式写入至少 16KBdrain才能触发事件。该drain事件用于多次写入比highWaterMark可写流设置更多的数据。

于 2013-09-21T14:35:11.003 回答
12

想象一下,您正在连接 2 个带宽非常不同的流,例如,将本地文件上传到慢速服务器。(快)文件流将比(慢)套接字流消耗数据更快地发出数据。

在这种情况下,node.js 会将数据保存在内存中,直到慢流有机会处理它。如果文件非常大,这可能会出现问题。

为避免这种情况,当底层系统缓冲区已满时Stream.write返回。false如果停止写入,流稍后会发出一个drain事件,指示系统缓冲区已清空,可以再次写入。

您可以使用pause/resume可读流并控制可读流的带宽。

更好:您可以使用readable.pipe(writable)which 将为您执行此操作。

编辑:您的代码中有一个错误:无论write返回什么,您的数据都已写入。您无需重试。在你的情况下,你写data了两次。

像这样的东西会起作用:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}
于 2013-09-21T12:47:53.400 回答
2

这是一个带有 async/await 的版本

const write = (writer, data) => {
  return new Promise((resolve) => {
    if (!writer.write(data)) {
      writer.once('drain', resolve)
    }
    else {
      resolve()
    }
  })
}

// usage
const run = async () => {
  const write_stream = fs.createWriteStream('...')
  const max = 1000000
  let current = 0
  while (current <= max) {
    await write(write_stream, current++)
  }
}

https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73

于 2019-03-19T15:33:48.760 回答
1

这是使用 Promises (async/await) 的速度优化版本。调用者必须检查它是否得到promise回复,只有在这种情况下await才需要调用。在每次调用时执行 await 会使程序减慢 3 倍...

const write = (writer, data) => {
    // return a promise only when we get a drain
    if (!writer.write(data)) {
        return new Promise((resolve) => {
            writer.once('drain', resolve)
        })
    }
}

// usage
const run = async () => {
    const write_stream = fs.createWriteStream('...')
    const max = 1000000
    let current = 0
    while (current <= max) {
        const promise = write(write_stream, current++)
        // since drain happens rarely, awaiting each write call is really slow.
        if (promise) {
            // we got a drain event, therefore we wait
            await promise
        }
    }
}
于 2019-07-04T16:57:00.170 回答