66

我想将数据从 amazon kinesis 流传输到 s3 日志或 bunyan 日志。

该示例适用于文件写入流或标准输出。我将如何实现自己的可写流?

//this works
var file = fs.createWriteStream('my.log')
kinesisSource.pipe(file)

这不起作用,说它没有“开”的方法

var stream = {}; //process.stdout works however
stream.writable = true;
stream.write =function(data){
    console.log(data);
};
kinesisSource.pipe(stream);

我必须为自己的自定义可写流实现哪些方法,文档似乎表明我需要实现“写入”而不是“开启”

4

3 回答 3

142

要创建自己的可写流,您有三种可能性。

创建自己的班级

为此,您需要 1) 扩展 Writable 类 2) 在您自己的构造函数中调用 Writable 构造函数 3)_write()在流对象的原型中定义一个方法。

这是一个例子:

var stream = require('stream');
var util = require('util');

function EchoStream () { // step 2
  stream.Writable.call(this);
};
util.inherits(EchoStream, stream.Writable); // step 1
EchoStream.prototype._write = function (chunk, encoding, done) { // step 3
  console.log(chunk.toString());
  done();
}

var myStream = new EchoStream(); // instanciate your brand new stream
process.stdin.pipe(myStream);

扩展一个空的可写对象

Writable您可以实例化一个空对象并实现该_write()方法,而不是定义一个新的对象类型:

var stream = require('stream');
var echoStream = new stream.Writable();
echoStream._write = function (chunk, encoding, done) {
  console.log(chunk.toString());
  done();
};

process.stdin.pipe(echoStream);

使用简化的构造函数 API

如果您使用 io.js,则可以使用简化的构造函数 API

var writable = new stream.Writable({
  write: function(chunk, encoding, next) {
    console.log(chunk.toString());
    next();
  }
});

在 Node 4+ 中使用 ES6 类

class EchoStream extends stream.Writable {
  _write(chunk, enc, next) {
    console.log(chunk.toString());
    next();
  }
}
于 2014-02-05T17:09:45.500 回答
10

实际上创建一个可写流非常简单。这是示例:

var fs = require('fs');
var Stream = require('stream');

var ws = new Stream;
ws.writable = true;
ws.bytes = 0;

ws.write = function(buf) {
   ws.bytes += buf.length;
}

ws.end = function(buf) {
   if(arguments.length) ws.write(buf);
   ws.writable = false;

   console.log('bytes length: ' + ws.bytes);
}

fs.createReadStream('file path').pipe(ws);

此外,如果您想创建自己的课程,@Paul 会给出一个很好的答案。

于 2014-02-18T02:26:30.477 回答
0

这是直接来自 nodejs 文档https://nodejs.org/api/stream.html#an-example-writable-stream的示例

const { Writable } = require('stream');
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}
于 2022-01-28T21:43:55.740 回答