3

我了解如何在 Node 的新Streams2库中使用可写流,但我不了解如何使用可读流。

dgram以模块周围的流包装器为例:

var dgram = require('dgram');

var thumbs = {
  twiddle: function() {}
};

var defaults = {
  address: '0.0.0.0',
  type: 'udp4',
  port: 12345,
  broadcast: null,
  multicast: null,
  multicastTTL: 1
};

var UDPStream = function(options) {
  if (!(this instanceof UDPStream))
    return new UDPStream(options);

  Duplex.call(this);

  options = options || {};

  this.address = options.address || defaults.address;
  this.type = options.type || defaults.type;
  this.port = options.port || defaults.port;
  this.broadcast = options.broadcast || defaults.broadcast;
  this.multicast = options.multicast || defaults.multicast;
  this.multicastTTL = options.multicastTTL || defaults.multicastTTL;

  this._socket = dgram.createSocket(this.type, setup.bind(this));
  this._socket.on('message', this.push.bind(this));
};

util.inherits(UDPStream, Duplex);

var setup = function() {
  if (this.multicast) {
    this._socket.addMembership(this.multicast);
    this._socket.setMulticastTTL(this.multicastTTL);

    this.destination = this.multicast;
  } else {
    // default to using broadcast if multicast address is not specified.
    this._socket.setBroadcast(true);

    // TODO: get the default broadcast address from os.networkInterfaces() (not currently returned)
    this.destination = this.broadcast || '255.255.255.255';
  }
};

UDPStream.prototype._read = function(size) {
  thumbs.twiddle();
};

UDPStream.prototype._write = function(chunk, encoding, callback) {
  this._socket.send(chunk, 0, chunk.length, this.port, this.destination);
  callback();
};

module.exports = UDPStream;

除了实现之外,一切都有意义_read。这实际上是在摆弄拇指,因为我不明白我应该在那里做什么。当 udp 套接字发出新消息时,我的数据被推送,但我无法暂停或恢复底层资源。这应该是什么样子?

4

2 回答 2

0

_read 是暂停恢复机制的一部分。来自 NodeJS API 文档

当数据可用时,通过调用 readable.push(chunk) 将其放入读取队列。如果 push 返回 false,那么您应该停止阅读。当再次调用 _read 时,您应该开始推送更多数据。

因此,在您的 _write 函数中,如果socket.send调用因返回 false 或调用带有错误的回调而失败,您应该暂停流。_read然后可以简单地做this._paused = false

可能看起来像这样。

UDPStream.prototype._read = function() {
  this._paused = false;
}

UDPStream.prototype._write = function(chunk, encoding, callback) {
  if(!this._paused)
   this._socket.send(chunk, 0, chunk.length, this.port, this.destination);
};
于 2013-07-11T20:47:08.950 回答
-1

答案相当简单:如果真的无法对底层资源施加背压,那么您的_read实现就是空的。流将负责将您推送的数据排队,直到它到达highWaterMark,但除此之外不保证任何内容。文档说你应该“只要数据可用就简单地提供数据”。

于 2013-10-23T17:56:54.017 回答