2

我希望只有在传递给 _flush 的回调完成后才会触发“完成”回调,但它是在 _flush 完成之前触发的。为什么?

var stream = require('stream').Transform();

// Nothing interesting here
stream._transform = function (chunk, enc, next) {
  next();
};

// _flush makes an async call.
stream._flush = function (callback) {
  console.log("Expecting 1st: Entering _flush ");

  return process.nextTick(function () {
    console.log("Expecting 2nd: Done with _flush");
    callback();
  });
};

stream.on('finish',function () {
  console.log("Expecting 3rd: Entering finish callback-- should be flushed now.");
});

stream.end("chunk");

完成的node.js 流文档说只有当所有数据都被刷新到底层系统时才会触发事件。

查看 Node 的 Transform 流实现的源代码,它的意图似乎是仅在_flush调用回调时触发完成:

this.once('prefinish', function() {
  if (util.isFunction(this._flush))
    this._flush(function(er) {
      done(stream, er);
    });
else
   done(stream);
});

要么我错过了一些东西,要么这是 Node 的 Transform 流的错误。我认为是前者,但没有发现我的问题。谢谢你的帮助!

更新:请注意,如果我将调用替换为process.nextTick()同步调用sleep,如sleep模块提供的那样,那么问题就会消失。该问题仅由_flush.

4

3 回答 3

3

在节点 v0.10 中,_flush()在 'finish' 上被调用,因此更明显可以看出这是预期的结果。

然而,在 master (v0.11.x) 中,似乎发明了一个新的“prefinish”事件并稍微改变了一些事情。但是,如果您查看代码,您会看到调用了 prefinish() 来触发 _flush()。当 prefinish() 在发出“prefinish”后返回时,会发出“finish”。在执行此操作之前,它不会等待调用 _flush() 回调。

因此,两个版本的结果行为相同。现在是否应该改变行为是一个单独的问题。你可以创建一个关于这种情况的 github 问题,看看是否有兴趣修改行为。

于 2014-05-13T01:31:25.670 回答
3

'finish' 事件没有直接控制,并且 '_flush' 不是 Writable 流的一部分。

使用 PassThrough,并在 'finish' 事件之后执行 'this.resume()' 你将得到一个 'end' 事件。

或者,您可以遵循拉取请求:https ://github.com/nodejs/node-v0.x-archive/pull/7612

或者使用一个模块:https ://github.com/TomFrost/FlushWritable

于 2016-03-18T02:12:27.977 回答
0

我知道这是一个老问题,但是文档还不清楚,所以我仍在寻找答案...

简短的回答:听end事件而不是finish事件。

更长的答案:尽管文档对此非常模棱两可(恕我直言,谈论所有数据都已刷新到底层系统),但我的理解是,当上游流中的所有数据都已传递给您的转换finish时,会发出事件,如由返回的方法检测。这样你的转换的“可写”部分就完成了。但这并不意味着您的 Transform 已完成处理并发出数据作为响应! 另一方面,当下游流不再需要消耗数据时会发出事件,正如您的检测到的那样_flush
end_flush调用回调方法的方法,这意味着您的 Transform 没有更多的数据要传递到下游,即您的 Transform 的“可读”部分已完成。

因此,以下代码将提供预期的行为:

var stream = require('stream').Transform();

// Nothing interesting here
stream._transform = function (chunk, enc, next) {
  next();
};

// _flush makes an async call.
stream._flush = function (callback) {
  console.log("Expecting 1st: Entering _flush ");

  return process.nextTick(function () {
    console.log("Expecting 2nd: Done with _flush");
    callback();
  });
};

// Here, listen on end instead of finish
stream.on('end',function () {
  console.log("Expecting 3rd: Entering finish callback-- should be flushed now.");
});

stream.end("chunk");
于 2022-02-14T10:40:40.537 回答