目标:对象将被推送到可读流,然后根据它们来自的渠道(电子邮件、推送、应用内)保存在单独的 .csv 中。
问题:我无法将流分离到不同的 .pipe() “行”中,因此所有 .csv 日志仅接收其通道特定的事件对象。但在当前迭代中,由 Writestream 创建的所有 .csv 文件都从所有通道接收事件对象。
问题: 我可以在 setup() 函数中以编程方式动态创建多通道“pipe() 线”,还是我目前的处理方式正确?
手动创建“pipe() 行”是所有 .csv 都填充事件的原因吗?这可以通过一条“管道()线”和动态路由来解决吗?
以下代码的简要说明:
setup()调用makeStreams() - 创建一个具有 Readable 和 Writable 的对象(旋转文件系统 Writable 流)(setup() 现在是一个不必要的函数,但稍后会执行更多设置任务。)
当入站事件发生时调用pushStream()并推送如下对象:{Email: {queryParam:1, queryParam:2, etc.}} 事件按最高级别的 obj 排序(在本例中为“Email”)和然后被推送到正确的可写流,理论上应该移植到正确的可写流。不幸的是,情况并非如此,它将事件对象发送到所有可写流。如何仅将其发送到正确的流?
代码:
const Readable = require('stream').Readable
const Json2csvTransform = require('json2csv').Transform;
var rfs = require("rotating-file-stream");
const channelTypes = ['Push Notification', 'Email', 'In-app Message']
var streamArr = setup(channelTypes);
const opts = {};
const transformOpts = {
objectMode: true
};
const json2csv = new Json2csvTransform(opts, transformOpts);
function setup(list) {
console.log("Setting up streams...")
streamArr = makeStreams(list) //makes streams out of each endpoint
return streamArr
}
//Stream Builder for Logging Based Upon Channel Name
function makeStreams(listArray) {
listArray = ['Push Notification', 'Email', 'In-app Message']
var length = listArray.length
var streamObjs = {}
for (var name = 0; name < length; name++) {
var fileName = listArray[name] + '.csv'
const readStream = new Readable({
objectMode: true,
read() {}
})
const writeStream = rfs(fileName, {
size: "50M", // rotate every 50 MegaBytes written
interval: "1d" // rotate daily
//compress: "gzip" // compress rotated files
});
var objName = listArray[name]
var obj = {
instream: readStream,
outstream: writeStream
}
streamObjs[objName] = obj
}
return streamObjs
}
function pushStream(obj) {
var keys = Object.keys(obj)
if (streamArr[keys]) {
streamArr[keys].instream.push(obj[keys])
} else {
console.log("event without a matching channel error")
}
}
//Had to make each pipe line here manually. Can this be improved? Is it the reason all of the files are receiving all events?
streamArr['Email'].instream.pipe(json2csv).pipe(streamArr['Email'].outstream)
streamArr['In-app Message'].instream.pipe(json2csv).pipe(streamArr['In-app Message'].outstream)
streamArr['Push Notification'].instream.pipe(json2csv).pipe(streamArr['Push Notification'].outstream)
module.exports = {
makeStreams,
pushStream,
setup
}